diff --git a/.github/repolinter.json b/.github/repolinter.json new file mode 100644 index 0000000..2c38bab --- /dev/null +++ b/.github/repolinter.json @@ -0,0 +1,77 @@ +{ + "$schema": "https://raw.githubusercontent.com/todogroup/repolinter/master/rulesets/schema.json", + "version": 2, + "axioms": {}, + "rules": { + "license-file-exists": { + "level": "error", + "rule": { + "type": "file-existence", + "options": { + "globsAny": ["LICENSE*", "COPYING*"] + } + } + }, + "readme-file-exists": { + "level": "error", + "rule": { + "type": "file-existence", + "options": { + "globsAny": ["README*"] + } + } + }, + "contributing-file-exists": { + "level": "warning", + "rule": { + "type": "file-existence", + "options": { + "globsAny": ["CONTRIBUTING*"] + } + } + }, + "changelog-file-exists": { + "level": "warning", + "rule": { + "type": "file-existence", + "options": { + "globsAny": ["CHANGELOG*"] + } + } + }, + "code-of-conduct-file-exists": { + "level": "warning", + "rule": { + "type": "file-existence", + "options": { + "globsAny": ["CODE_OF_CONDUCT*", "CODE-OF-CONDUCT*", ".github/CODE_OF_CONDUCT*"] + } + } + }, + "security-file-exists": { + "level": "warning", + "rule": { + "type": "file-existence", + "options": { + "globsAny": ["SECURITY*", ".github/SECURITY*"] + } + } + }, + "notice-file-exists": { + "level": "warning", + "rule": { + "type": "file-existence", + "options": { + "globsAny": ["NOTICE*"] + } + } + }, + "license-detectable-by-licensee": { + "level": "warning", + "rule": { + "type": "license-detectable-by-licensee", + "options": {} + } + } + } +} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 757042e..01ad7c5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,4 +101,6 @@ jobs: with: fetch-depth: 0 - name: DCO check - uses: dcoapp/app@v1 + uses: christophebedard/dco-check@0.5.0 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/repolinter.yml b/.github/workflows/repolinter.yml index 1c07d88..3f1add9 100644 --- a/.github/workflows/repolinter.yml +++ b/.github/workflows/repolinter.yml @@ -21,4 +21,4 @@ jobs: - name: Run Repolinter uses: todogroup/repolinter-action@v1 with: - config_url: https://raw.githubusercontent.com/lfai/foundation/main/repolinter.json + config_file: .github/repolinter.json diff --git a/README.md b/README.md index 3605c12..0455a84 100644 --- a/README.md +++ b/README.md @@ -4,78 +4,138 @@ [![PyPI version](https://img.shields.io/pypi/v/botanu)](https://pypi.org/project/botanu/) [![Python](https://img.shields.io/badge/python-3.9%20|%203.10%20|%203.11%20|%203.12%20|%203.13-blue)](https://www.python.org/) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE) +[![LF AI & Data](https://img.shields.io/badge/LF%20AI%20%26%20Data-member-blue)](https://lfaidata.foundation/) -OpenTelemetry-native run-level cost attribution for AI workflows. +**Run-level cost attribution for AI workflows, built on OpenTelemetry.** -## Overview +Botanu adds **runs** on top of distributed tracing. A run represents one business transaction that may span multiple LLM calls, database queries, and microservices. By correlating every operation to a stable `run_id`, you get per-transaction cost attribution without sampling artifacts. -Botanu adds **runs** on top of distributed tracing. A run represents a single business transaction that may span multiple LLM calls, database queries, and services. By correlating all operations to a stable `run_id`, you get accurate cost attribution without sampling artifacts. +## How It Works + +``` +User Request + | + v + Entry Service Intermediate Service LLM / DB + @botanu_use_case --> enable() propagates --> auto-instrumented + creates run_id run_id via W3C Baggage spans tagged with run_id +``` + +1. **Entry point** creates a `run_id` with `@botanu_use_case` +2. **Every service** calls `enable()` to propagate the `run_id` via W3C Baggage +3. **All spans** across all services share the same `run_id` +4. **Traces export** to your OTel Collector via OTLP (configured by environment variable) ## Quick Start +### Install + +```bash +pip install botanu +``` + +One install. Includes OTel SDK, OTLP exporter, and auto-instrumentation for 50+ libraries. + +### Instrument Your Code + +**Entry service** (where the workflow begins): + ```python from botanu import enable, botanu_use_case -enable(service_name="my-service") +enable() # reads config from env vars -@botanu_use_case(name="my_workflow") -def my_function(): - data = db.query(...) - result = llm.complete(...) +@botanu_use_case(name="Customer Support") +async def handle_ticket(ticket_id: str): + data = await db.query(ticket_id) + result = await llm.complete(data) return result ``` -## Installation +**Every other service** (intermediate, downstream): -```bash -pip install "botanu[all]" +```python +from botanu import enable + +enable() # propagates run_id from incoming request ``` -| Extra | Description | -|-------|-------------| -| `sdk` | OpenTelemetry SDK + OTLP exporter | -| `instruments` | Auto-instrumentation for HTTP, databases | -| `genai` | Auto-instrumentation for LLM providers | -| `all` | All of the above (recommended) | +That's it. No collector endpoint in code. No manual span creation. -## What Gets Auto-Instrumented +### Configure via Environment Variables -When you install `botanu[all]`, the following are automatically tracked: +All configuration is via environment variables. **Zero hardcoded values in code.** -- **LLM Providers** — OpenAI, Anthropic, Vertex AI, Bedrock, Azure OpenAI -- **Databases** — PostgreSQL, MySQL, SQLite, MongoDB, Redis -- **HTTP** — requests, httpx, urllib3, aiohttp -- **Frameworks** — FastAPI, Flask, Django, Starlette -- **Messaging** — Celery, Kafka +| Variable | Description | Default | +|----------|-------------|---------| +| `OTEL_EXPORTER_OTLP_ENDPOINT` | Collector endpoint | `http://localhost:4318` | +| `OTEL_SERVICE_NAME` | Service name | `unknown_service` | +| `BOTANU_ENVIRONMENT` | Deployment environment | `production` | -No manual instrumentation required. +```yaml +# docker-compose.yml / Kubernetes deployment +environment: + - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 + - OTEL_SERVICE_NAME=my-service +``` -## Kubernetes Deployment +See [Configuration Reference](./docs/getting-started/configuration.md) for all options. -For large-scale deployments (2000+ services): +## Auto-Instrumentation -| Service Type | Code Change | Kubernetes Config | -|--------------|-------------|-------------------| -| Entry point | `@botanu_use_case` decorator | Annotation | -| Intermediate | None | Annotation only | +Everything is included and auto-detected. If the library is in your dependencies, it gets instrumented: -```yaml -# Intermediate services - annotation only, no code changes -metadata: - annotations: - instrumentation.opentelemetry.io/inject-python: "true" -``` +| Category | Libraries | +|----------|-----------| +| **LLM Providers** | OpenAI, Anthropic, Vertex AI, Google GenAI, LangChain, Ollama, CrewAI | +| **Web Frameworks** | FastAPI, Flask, Django, Starlette, Falcon, Pyramid, Tornado | +| **HTTP Clients** | requests, httpx, urllib3, aiohttp | +| **Databases** | PostgreSQL (psycopg2/3, asyncpg), MySQL, SQLite, MongoDB, Redis, SQLAlchemy, Elasticsearch, Cassandra | +| **Messaging** | Celery, Kafka, RabbitMQ (pika) | +| **AWS** | botocore, boto3 (SQS) | +| **gRPC** | Client + Server | +| **Runtime** | logging, threading, asyncio | + +No manual instrumentation required. Libraries not installed are silently skipped. + +## Kubernetes at Scale + +For large deployments (2000+ services), only entry points need code changes: -Auto-instrumentation captures all HTTP calls including retries (requests, httpx, aiohttp, urllib3). +| Service Type | Code Change | Configuration | +|--------------|-------------|---------------| +| Entry point | `@botanu_use_case` decorator | `OTEL_EXPORTER_OTLP_ENDPOINT` env var | +| Intermediate | `enable()` call only | `OTEL_EXPORTER_OTLP_ENDPOINT` env var | See [Kubernetes Deployment Guide](./docs/integration/kubernetes.md) for details. +## Architecture + +``` + +---------+ +---------+ +---------+ + | Service | --> | Service | --> | Service | + | enable()| --> | enable()| --> | enable()| + +---------+ +---------+ +---------+ + | | | + v v v + +-------------------------------------+ + | OTel Collector (OTLP) | + +-------------------------------------+ + | | | + v v v + Jaeger/Tempo Prometheus Your Backend +``` + +The SDK is a thin layer on OpenTelemetry: +- **SDK**: Generates `run_id`, propagates context, auto-instruments +- **Collector**: PII redaction, cardinality limits, routing, vendor enrichment + ## Documentation -- [Getting Started](./docs/getting-started/) -- [Concepts](./docs/concepts/) -- [Integration](./docs/integration/) -- [API Reference](./docs/api/) +- [Getting Started](./docs/getting-started/) - Installation, quickstart, configuration +- [Concepts](./docs/concepts/) - Runs, context propagation, cost attribution +- [Integration](./docs/integration/) - Auto-instrumentation, Kubernetes, collector setup +- [API Reference](./docs/api/) - `enable()`, `@botanu_use_case`, `emit_outcome()` ## Requirements @@ -84,7 +144,9 @@ See [Kubernetes Deployment Guide](./docs/integration/kubernetes.md) for details. ## Contributing -See [CONTRIBUTING.md](./CONTRIBUTING.md). This project uses DCO sign-off. +We welcome contributions. See [CONTRIBUTING.md](./CONTRIBUTING.md). + +This project follows the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). Sign off your commits: ```bash git commit -s -m "Your commit message" diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md index f11f9b1..3591b72 100644 --- a/docs/getting-started/installation.md +++ b/docs/getting-started/installation.md @@ -1,131 +1,70 @@ # Installation -This guide covers installing Botanu SDK and its optional dependencies. - ## Requirements - Python 3.9 or later -- OpenTelemetry Collector (for span processing) - -## Basic Installation +- OpenTelemetry Collector (recommended for production) -Install the core SDK with pip: +## Install ```bash pip install botanu ``` -The core package has minimal dependencies: -- `opentelemetry-api >= 1.20.0` - -This is all you need if you already have OpenTelemetry configured in your application. - -## Installation with Extras +One install gives you everything: -### Full SDK (Recommended for Standalone) +- **OTel SDK** + OTLP HTTP exporter +- **Auto-instrumentation** for 50+ libraries (HTTP, databases, messaging, GenAI, AWS, gRPC) -If you don't have an existing OpenTelemetry setup: - -```bash -pip install "botanu[sdk]" -``` - -This adds: -- `opentelemetry-sdk` - The OTel SDK implementation -- `opentelemetry-exporter-otlp-proto-http` - OTLP HTTP exporter - -### Auto-Instrumentation - -For automatic instrumentation of common libraries: - -```bash -pip install "botanu[instruments]" -``` +Instrumentation packages are lightweight shims that silently no-op when the target library is not installed. Zero bloat. -Includes instrumentation for: -- **HTTP clients**: requests, httpx, urllib3, aiohttp -- **Web frameworks**: FastAPI, Flask, Django, Starlette -- **Databases**: SQLAlchemy, psycopg2, asyncpg, pymongo, redis -- **Messaging**: Celery, Kafka -- **Other**: gRPC, logging +## Verify -### GenAI Instrumentation - -For automatic LLM provider instrumentation: - -```bash -pip install "botanu[genai]" -``` - -Includes instrumentation for: -- OpenAI -- Anthropic -- Google Vertex AI -- Google GenAI -- LangChain - -### Everything - -To install all optional dependencies: - -```bash -pip install "botanu[all]" +```python +import botanu +print(botanu.__version__) ``` -### Development +## Package Managers -For development and testing: +### pip / requirements.txt -```bash -pip install "botanu[dev]" +```text +botanu>=0.1.0 ``` -## Verify Installation +### Poetry -```python -import botanu -print(botanu.__version__) +```toml +[tool.poetry.dependencies] +botanu = "^0.1.0" ``` -## Docker - -In a Dockerfile: +### Docker ```dockerfile -FROM python:3.11-slim - +FROM python:3.12-slim WORKDIR /app - -# Install Botanu with SDK extras -RUN pip install "botanu[sdk]" - +RUN pip install botanu COPY . . - CMD ["python", "app.py"] ``` -## Poetry - -```toml -[tool.poetry.dependencies] -botanu = { version = "^0.1.0", extras = ["sdk"] } -``` - -## pip-tools / requirements.txt +## Development -```text -# requirements.in -botanu[sdk]>=0.1.0 -``` +For running tests and linting: -Generate with: ```bash -pip-compile requirements.in +pip install "botanu[dev]" ``` ## Collector Setup -Botanu SDK sends traces to an OpenTelemetry Collector. You'll need one running to receive spans. +The SDK sends traces to an OpenTelemetry Collector via OTLP HTTP (port 4318). Configure the endpoint via environment variable: + +```bash +export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +``` Quick start with Docker: @@ -133,9 +72,9 @@ Quick start with Docker: docker run -p 4318:4318 otel/opentelemetry-collector:latest ``` -See [Collector Configuration](../integration/collector.md) for detailed setup. +See [Collector Configuration](../integration/collector.md) for production setup. ## Next Steps - [Quickstart](quickstart.md) - Your first instrumented application -- [Configuration](configuration.md) - Customize SDK behavior +- [Configuration](configuration.md) - Environment variables and YAML config diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md index b2fd386..3acbf97 100644 --- a/docs/getting-started/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -1,35 +1,52 @@ # Quickstart -Get run-level cost attribution working in minutes. +Get run-level cost attribution working in 5 minutes. ## Prerequisites - Python 3.9+ -- OpenTelemetry Collector (see [Collector Configuration](../integration/collector.md)) +- OpenTelemetry Collector running (see [Collector Configuration](../integration/collector.md)) ## Step 1: Install ```bash -pip install "botanu[all]" +pip install botanu ``` -## Step 2: Enable +## Step 2: Set Environment Variables + +```bash +export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +export OTEL_SERVICE_NAME=my-service +``` + +Or in Docker / Kubernetes: + +```yaml +environment: + - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 + - OTEL_SERVICE_NAME=my-service +``` + +## Step 3: Enable SDK ```python from botanu import enable -enable(service_name="my-service") +enable() ``` -## Step 3: Define Entry Point +Call `enable()` once at application startup. It reads configuration from environment variables — no hardcoded values needed. + +## Step 4: Define Entry Point ```python from botanu import botanu_use_case -@botanu_use_case(name="my_workflow") -def my_function(): - data = db.query(...) - result = llm.complete(...) +@botanu_use_case(name="Customer Support") +async def handle_ticket(ticket_id: str): + data = await db.query(ticket_id) + result = await llm.complete(data) return result ``` @@ -37,35 +54,45 @@ All LLM calls, database queries, and HTTP requests inside the function are autom ## Complete Example +**Entry service** (`entry/app.py`): + ```python from botanu import enable, botanu_use_case -enable(service_name="my-service") +enable() -@botanu_use_case(name="my_workflow") -def my_function(): - data = db.query(...) - result = openai.chat.completions.create( +@botanu_use_case(name="Customer Support") +async def handle_ticket(ticket_id: str): + data = await db.query(ticket_id) + result = await openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": data}] ) return result ``` +**Downstream service** (`intermediate/app.py`): + +```python +from botanu import enable + +enable() # propagates run_id from incoming request — no decorator needed +``` + ## What Gets Tracked | Attribute | Example | Description | |-----------|---------|-------------| -| `botanu.run_id` | `019abc12-...` | Unique run identifier | -| `botanu.use_case` | `my_workflow` | Business use case | +| `botanu.run_id` | `019abc12-...` | Unique run identifier (UUIDv7) | +| `botanu.use_case` | `Customer Support` | Business use case | | `gen_ai.usage.input_tokens` | `150` | LLM input tokens | | `gen_ai.usage.output_tokens` | `200` | LLM output tokens | | `db.system` | `postgresql` | Database system | -All spans share the same `run_id`, enabling cost-per-transaction analytics. +All spans across all services share the same `run_id`, enabling cost-per-transaction analytics. ## Next Steps - [Configuration](configuration.md) - Environment variables and YAML config - [Kubernetes Deployment](../integration/kubernetes.md) - Zero-code instrumentation at scale -- [Context Propagation](../concepts/context-propagation.md) - Cross-service tracing +- [Context Propagation](../concepts/context-propagation.md) - How run_id flows across services diff --git a/docs/integration/auto-instrumentation.md b/docs/integration/auto-instrumentation.md index bec3e44..be504bc 100644 --- a/docs/integration/auto-instrumentation.md +++ b/docs/integration/auto-instrumentation.md @@ -1,78 +1,107 @@ # Auto-Instrumentation -Automatically instrument common libraries without code changes. +Botanu automatically instruments 50+ libraries with zero code changes. -## Installation +## How It Works -```bash -pip install "botanu[all]" -``` - -## Usage +When you call `enable()`, the SDK detects which libraries are installed in your environment and instruments them automatically. Libraries that aren't installed are silently skipped. ```python -from botanu import enable, botanu_use_case - -enable(service_name="my-service") - -@botanu_use_case(name="my_workflow") -def my_function(): - data = db.query(...) - result = openai.chat.completions.create( - model="gpt-4", - messages=[{"role": "user", "content": data}] - ) - return result +from botanu import enable + +enable() # auto-instruments everything that's installed ``` -All operations inside are automatically traced. +No configuration needed. No import order requirements. Just call `enable()` at startup. ## Supported Libraries -### HTTP Clients +### LLM Providers -| Library | Package | -|---------|---------| -| requests | `opentelemetry-instrumentation-requests` | -| httpx | `opentelemetry-instrumentation-httpx` | -| urllib3 | `opentelemetry-instrumentation-urllib3` | -| aiohttp | `opentelemetry-instrumentation-aiohttp-client` | +| Provider | Instrumentation Package | +|----------|------------------------| +| OpenAI | `opentelemetry-instrumentation-openai-v2` | +| Anthropic | `opentelemetry-instrumentation-anthropic` | +| Vertex AI | `opentelemetry-instrumentation-vertexai` | +| Google GenAI | `opentelemetry-instrumentation-google-generativeai` | +| LangChain | `opentelemetry-instrumentation-langchain` | +| Ollama | `opentelemetry-instrumentation-ollama` | +| CrewAI | `opentelemetry-instrumentation-crewai` | ### Web Frameworks -| Framework | Package | -|-----------|---------| +| Framework | Instrumentation Package | +|-----------|------------------------| | FastAPI | `opentelemetry-instrumentation-fastapi` | | Flask | `opentelemetry-instrumentation-flask` | | Django | `opentelemetry-instrumentation-django` | | Starlette | `opentelemetry-instrumentation-starlette` | +| Falcon | `opentelemetry-instrumentation-falcon` | +| Pyramid | `opentelemetry-instrumentation-pyramid` | +| Tornado | `opentelemetry-instrumentation-tornado` | + +### HTTP Clients + +| Library | Instrumentation Package | +|---------|------------------------| +| requests | `opentelemetry-instrumentation-requests` | +| httpx | `opentelemetry-instrumentation-httpx` | +| urllib3 | `opentelemetry-instrumentation-urllib3` | +| urllib | `opentelemetry-instrumentation-urllib` | +| aiohttp (client) | `opentelemetry-instrumentation-aiohttp-client` | +| aiohttp (server) | `opentelemetry-instrumentation-aiohttp-server` | ### Databases -| Database | Package | -|----------|---------| +| Database | Instrumentation Package | +|----------|------------------------| | SQLAlchemy | `opentelemetry-instrumentation-sqlalchemy` | | psycopg2 | `opentelemetry-instrumentation-psycopg2` | +| psycopg3 | `opentelemetry-instrumentation-psycopg` | | asyncpg | `opentelemetry-instrumentation-asyncpg` | +| aiopg | `opentelemetry-instrumentation-aiopg` | | pymongo | `opentelemetry-instrumentation-pymongo` | | redis | `opentelemetry-instrumentation-redis` | +| MySQL | `opentelemetry-instrumentation-mysql` | +| mysqlclient | `opentelemetry-instrumentation-mysqlclient` | +| PyMySQL | `opentelemetry-instrumentation-pymysql` | +| SQLite3 | `opentelemetry-instrumentation-sqlite3` | +| Elasticsearch | `opentelemetry-instrumentation-elasticsearch` | +| Cassandra | `opentelemetry-instrumentation-cassandra` | +| TortoiseORM | `opentelemetry-instrumentation-tortoiseorm` | +| pymemcache | `opentelemetry-instrumentation-pymemcache` | + +### Messaging & Task Queues + +| System | Instrumentation Package | +|--------|------------------------| +| Celery | `opentelemetry-instrumentation-celery` | +| kafka-python | `opentelemetry-instrumentation-kafka-python` | +| confluent-kafka | `opentelemetry-instrumentation-confluent-kafka` | +| aiokafka | `opentelemetry-instrumentation-aiokafka` | +| pika (RabbitMQ) | `opentelemetry-instrumentation-pika` | +| aio-pika | `opentelemetry-instrumentation-aio-pika` | -### Messaging +### AWS -| System | Package | -|--------|---------| -| Celery | `opentelemetry-instrumentation-celery` | -| Kafka | `opentelemetry-instrumentation-kafka-python` | +| Service | Instrumentation Package | +|---------|------------------------| +| botocore | `opentelemetry-instrumentation-botocore` | +| boto3 SQS | `opentelemetry-instrumentation-boto3sqs` | -### LLM Providers +### gRPC -| Provider | Package | -|----------|---------| -| OpenAI | `opentelemetry-instrumentation-openai-v2` | -| Anthropic | `opentelemetry-instrumentation-anthropic` | -| Vertex AI | `opentelemetry-instrumentation-vertexai` | -| Google GenAI | `opentelemetry-instrumentation-google-genai` | -| LangChain | `opentelemetry-instrumentation-langchain` | +| Component | Instrumentation Package | +|-----------|------------------------| +| gRPC Client + Server | `opentelemetry-instrumentation-grpc` | + +### Runtime + +| Library | Instrumentation Package | +|---------|------------------------| +| logging | `opentelemetry-instrumentation-logging` | +| threading | `opentelemetry-instrumentation-threading` | +| asyncio | `opentelemetry-instrumentation-asyncio` | ## Context Propagation @@ -103,27 +132,6 @@ db.operation: SELECT db.statement: SELECT * FROM orders WHERE id = ? ``` -## Troubleshooting - -### Spans Not Appearing - -Ensure `enable()` is called before library imports: - -```python -from botanu import enable -enable(service_name="my-service") - -import requests -import openai -``` - -### Check Instrumentation Status - -```python -from opentelemetry.instrumentation.requests import RequestsInstrumentor -print(RequestsInstrumentor().is_instrumented()) -``` - ## See Also - [Kubernetes Deployment](kubernetes.md) - Zero-code instrumentation at scale diff --git a/pyproject.toml b/pyproject.toml index 93be461..224eb49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,76 +42,98 @@ classifiers = [ "Typing :: Typed", ] -# Core dependency — opentelemetry-api only (~50 KB, zero transitive deps). -# Everything else is behind optional extras so adopters never pay for what -# they don't use. +# All dependencies included in base install. OTel instrumentation packages +# are lightweight shims (~few KB each) that silently no-op when the target +# library is not installed, so there is zero bloat for unused instrumentations. dependencies = [ + # ── OTel core + SDK + exporter ───────────────────────────────── "opentelemetry-api >= 1.20.0", -] - -[project.urls] -Homepage = "https://github.com/botanu-ai/botanu-sdk-python" -Documentation = "https://docs.botanu.ai" -Repository = "https://github.com/botanu-ai/botanu-sdk-python" -Changelog = "https://github.com/botanu-ai/botanu-sdk-python/blob/main/CHANGELOG.md" -Issues = "https://github.com/botanu-ai/botanu-sdk-python/issues" - -# --------------------------------------------------------------------------- -# Optional extras -# --------------------------------------------------------------------------- -[project.optional-dependencies] -# Full OTel SDK + OTLP exporter — needed only when running standalone -# (no pre-existing TracerProvider from Datadog / Splunk / etc.) -sdk = [ "opentelemetry-sdk >= 1.20.0", "opentelemetry-exporter-otlp-proto-http >= 1.20.0", -] - -# Auto-instrumentation libraries for common frameworks -instruments = [ "opentelemetry-instrumentation >= 0.41b0", - "opentelemetry-instrumentation-fastapi >= 0.41b0", - "opentelemetry-instrumentation-requests >= 0.41b0", + + # ── HTTP clients ─────────────────────────────────────────────── "opentelemetry-instrumentation-httpx >= 0.41b0", + "opentelemetry-instrumentation-requests >= 0.41b0", + "opentelemetry-instrumentation-urllib3 >= 0.41b0", + "opentelemetry-instrumentation-urllib >= 0.41b0", + "opentelemetry-instrumentation-aiohttp-client >= 0.41b0", + "opentelemetry-instrumentation-aiohttp-server >= 0.41b0", + + # ── Web frameworks ───────────────────────────────────────────── + "opentelemetry-instrumentation-fastapi >= 0.41b0", "opentelemetry-instrumentation-flask >= 0.41b0", "opentelemetry-instrumentation-django >= 0.41b0", - "opentelemetry-instrumentation-urllib3 >= 0.41b0", "opentelemetry-instrumentation-starlette >= 0.41b0", + "opentelemetry-instrumentation-falcon >= 0.41b0", + "opentelemetry-instrumentation-pyramid >= 0.41b0", + "opentelemetry-instrumentation-tornado >= 0.41b0", + + # ── gRPC ─────────────────────────────────────────────────────── + "opentelemetry-instrumentation-grpc >= 0.41b0", + + # ── Databases ────────────────────────────────────────────────── "opentelemetry-instrumentation-sqlalchemy >= 0.41b0", + "opentelemetry-instrumentation-psycopg2 >= 0.41b0", + "opentelemetry-instrumentation-psycopg >= 0.41b0", + "opentelemetry-instrumentation-asyncpg >= 0.41b0", + "opentelemetry-instrumentation-aiopg >= 0.41b0", + "opentelemetry-instrumentation-pymongo >= 0.41b0", "opentelemetry-instrumentation-redis >= 0.41b0", + "opentelemetry-instrumentation-mysql >= 0.41b0", + "opentelemetry-instrumentation-mysqlclient >= 0.41b0", + "opentelemetry-instrumentation-pymysql >= 0.41b0", + "opentelemetry-instrumentation-sqlite3 >= 0.41b0", + "opentelemetry-instrumentation-elasticsearch >= 0.41b0", + "opentelemetry-instrumentation-cassandra >= 0.41b0", + "opentelemetry-instrumentation-tortoiseorm >= 0.41b0", + "opentelemetry-instrumentation-pymemcache >= 0.41b0", + + # ── Messaging / Task queues ──────────────────────────────────── "opentelemetry-instrumentation-celery >= 0.41b0", - "opentelemetry-instrumentation-grpc >= 0.41b0", - "opentelemetry-instrumentation-logging >= 0.41b0", -] + "opentelemetry-instrumentation-kafka-python >= 0.41b0", + "opentelemetry-instrumentation-confluent-kafka >= 0.41b0", + "opentelemetry-instrumentation-aiokafka >= 0.41b0", + "opentelemetry-instrumentation-pika >= 0.41b0", + "opentelemetry-instrumentation-aio-pika >= 0.41b0", + + # ── AWS ──────────────────────────────────────────────────────── + "opentelemetry-instrumentation-botocore >= 0.41b0", + "opentelemetry-instrumentation-boto3sqs >= 0.41b0", -# GenAI / AI model auto-instrumentation -genai = [ + # ── GenAI / AI ───────────────────────────────────────────────── "opentelemetry-instrumentation-openai-v2 >= 2.0b0", "opentelemetry-instrumentation-anthropic >= 0.1b0", "opentelemetry-instrumentation-vertexai >= 0.1b0", - "opentelemetry-instrumentation-google-genai >= 0.1b0", + "opentelemetry-instrumentation-google-generativeai >= 0.1b0", "opentelemetry-instrumentation-langchain >= 0.1b0", -] + "opentelemetry-instrumentation-ollama >= 0.1b0", + "opentelemetry-instrumentation-crewai >= 0.1b0", -# Cross-service carrier propagation (SQS, Kafka, Celery, Redis) -carriers = [ - "celery >= 5.0.0", - "aiokafka >= 0.8.0", + # ── Runtime / Concurrency ────────────────────────────────────── + "opentelemetry-instrumentation-logging >= 0.41b0", + "opentelemetry-instrumentation-threading >= 0.41b0", ] -# Everything -all = [ - "botanu[sdk,instruments,genai,carriers]", -] +[project.urls] +Homepage = "https://github.com/botanu-ai/botanu-sdk-python" +Documentation = "https://docs.botanu.ai" +Repository = "https://github.com/botanu-ai/botanu-sdk-python" +Changelog = "https://github.com/botanu-ai/botanu-sdk-python/blob/main/CHANGELOG.md" +Issues = "https://github.com/botanu-ai/botanu-sdk-python/issues" -# Development / CI +# --------------------------------------------------------------------------- +# Optional extras (dev only — base install includes everything) +# --------------------------------------------------------------------------- +[project.optional-dependencies] dev = [ - "botanu[all]", "pytest >= 7.4.0", "pytest-asyncio >= 0.21.0", "pytest-cov >= 4.1.0", "coverage[toml] >= 7.0", "httpx >= 0.24.0", + "starlette >= 0.27.0, < 0.30.0; python_version < '3.10'", + "starlette >= 0.27.0; python_version >= '3.10'", "ruff >= 0.4.0", "mypy >= 1.7.0", "pre-commit >= 3.5.0", diff --git a/src/botanu/__init__.py b/src/botanu/__init__.py index 2ccf3d8..527714b 100644 --- a/src/botanu/__init__.py +++ b/src/botanu/__init__.py @@ -7,7 +7,7 @@ from botanu import enable, botanu_use_case, emit_outcome - enable(service_name="my-app") + enable() # reads config from OTEL_SERVICE_NAME, OTEL_EXPORTER_OTLP_ENDPOINT env vars @botanu_use_case(name="Customer Support") async def handle_request(data): diff --git a/src/botanu/sdk/bootstrap.py b/src/botanu/sdk/bootstrap.py index 4fa34f2..879bffd 100644 --- a/src/botanu/sdk/bootstrap.py +++ b/src/botanu/sdk/bootstrap.py @@ -14,12 +14,14 @@ Usage:: from botanu import enable - enable(service_name="my-app", otlp_endpoint="http://collector:4318") + enable() # reads OTEL_SERVICE_NAME, OTEL_EXPORTER_OTLP_ENDPOINT from env """ from __future__ import annotations import logging +import os +import threading from typing import TYPE_CHECKING, List, Optional if TYPE_CHECKING: @@ -27,6 +29,7 @@ logger = logging.getLogger(__name__) +_lock = threading.RLock() _initialized = False _current_config: Optional[BotanuConfig] = None @@ -60,122 +63,140 @@ def enable( """ global _initialized, _current_config - if _initialized: - logger.warning("Botanu SDK already initialized") - return False + with _lock: + if _initialized: + logger.warning("Botanu SDK already initialized") + return False - logging.basicConfig(level=getattr(logging, log_level.upper())) + logging.basicConfig(level=getattr(logging, log_level.upper())) - from botanu.sdk.config import BotanuConfig as ConfigClass + from botanu.sdk.config import BotanuConfig as ConfigClass - if config is not None: - cfg = config - elif config_file is not None: - cfg = ConfigClass.from_yaml(config_file) - else: - cfg = ConfigClass.from_file_or_env() + if config is not None: + cfg = config + elif config_file is not None: + cfg = ConfigClass.from_yaml(config_file) + else: + cfg = ConfigClass.from_file_or_env() - if service_name is not None: - cfg.service_name = service_name - if otlp_endpoint is not None: - cfg.otlp_endpoint = otlp_endpoint - if environment is not None: - cfg.deployment_environment = environment + if service_name is not None: + cfg.service_name = service_name + if otlp_endpoint is not None: + cfg.otlp_endpoint = otlp_endpoint + if environment is not None: + cfg.deployment_environment = environment - _current_config = cfg + _current_config = cfg - traces_endpoint = cfg.otlp_endpoint - if traces_endpoint and not traces_endpoint.endswith("/v1/traces"): - traces_endpoint = f"{traces_endpoint.rstrip('/')}/v1/traces" + traces_endpoint = cfg.otlp_endpoint + if traces_endpoint and not traces_endpoint.endswith("/v1/traces"): + traces_endpoint = f"{traces_endpoint.rstrip('/')}/v1/traces" - logger.info( - "Initializing Botanu SDK: service=%s, env=%s, endpoint=%s", - cfg.service_name, - cfg.deployment_environment, - traces_endpoint, - ) - - try: - from opentelemetry import trace - from opentelemetry.baggage.propagation import W3CBaggagePropagator - from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter - from opentelemetry.propagate import set_global_textmap - from opentelemetry.propagators.composite import CompositePropagator - from opentelemetry.sdk.resources import Resource - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import BatchSpanProcessor - from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator - - from botanu._version import __version__ - from botanu.processors import RunContextEnricher - from botanu.resources.detector import detect_all_resources - - # Build resource attributes - resource_attrs = { - "service.name": cfg.service_name, - "deployment.environment": cfg.deployment_environment, - "telemetry.sdk.name": "botanu", - "telemetry.sdk.version": __version__, - } - if cfg.service_version: - resource_attrs["service.version"] = cfg.service_version - if cfg.service_namespace: - resource_attrs["service.namespace"] = cfg.service_namespace - - # Auto-detect resources (K8s, cloud, host, container, FaaS) - if cfg.auto_detect_resources: - detected = detect_all_resources() - for key, value in detected.items(): - if key not in resource_attrs: - resource_attrs[key] = value - if detected: - logger.debug("Auto-detected resources: %s", list(detected.keys())) - - resource = Resource.create(resource_attrs) - provider = TracerProvider(resource=resource) - - # RunContextEnricher — the ONLY processor in SDK. - # Reads run_id from baggage, stamps on all spans. - lean_mode = cfg.propagation_mode == "lean" - provider.add_span_processor(RunContextEnricher(lean_mode=lean_mode)) - - # OTLP exporter - exporter = OTLPSpanExporter( - endpoint=traces_endpoint, - headers=cfg.otlp_headers or {}, - ) - provider.add_span_processor( - BatchSpanProcessor( - exporter, - max_export_batch_size=cfg.max_export_batch_size, - max_queue_size=cfg.max_queue_size, - schedule_delay_millis=cfg.schedule_delay_millis, + otel_sampler_env = os.getenv("OTEL_TRACES_SAMPLER") + if otel_sampler_env and otel_sampler_env != "always_on": + logger.warning( + "OTEL_TRACES_SAMPLER=%s is set but Botanu enforces ALWAYS_ON. No spans will be sampled or dropped.", + otel_sampler_env, ) + + logger.info( + "Initializing Botanu SDK: service=%s, env=%s, endpoint=%s", + cfg.service_name, + cfg.deployment_environment, + traces_endpoint, ) - trace.set_tracer_provider(provider) + try: + from opentelemetry import trace + from opentelemetry.baggage.propagation import W3CBaggagePropagator + from opentelemetry.propagate import set_global_textmap + from opentelemetry.propagators.composite import CompositePropagator + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + except ImportError as exc: + logger.error("Missing opentelemetry-api. Install with: pip install botanu") + raise ImportError("opentelemetry-api is required. Install with: pip install botanu") from exc + + try: + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.trace.sampling import ALWAYS_ON + except ImportError as exc: + logger.error("Missing OTel SDK dependencies. Install with: pip install botanu") + raise ImportError("OTel SDK and exporter required for enable(). Install with: pip install botanu") from exc + + try: + from botanu._version import __version__ + from botanu.processors import RunContextEnricher + from botanu.resources.detector import detect_all_resources + + resource_attrs = { + "service.name": cfg.service_name, + "deployment.environment": cfg.deployment_environment, + "telemetry.sdk.name": "botanu", + "telemetry.sdk.version": __version__, + } + if cfg.service_version: + resource_attrs["service.version"] = cfg.service_version + if cfg.service_namespace: + resource_attrs["service.namespace"] = cfg.service_namespace + + if cfg.auto_detect_resources: + detected = detect_all_resources() + for key, value in detected.items(): + if key not in resource_attrs: + resource_attrs[key] = value + if detected: + logger.debug("Auto-detected resources: %s", list(detected.keys())) + + resource = Resource.create(resource_attrs) + + existing = trace.get_tracer_provider() + if isinstance(existing, TracerProvider): + provider = existing + logger.info("Reusing existing TracerProvider — adding Botanu processors") + else: + provider = TracerProvider(resource=resource, sampler=ALWAYS_ON) + trace.set_tracer_provider(provider) + + lean_mode = cfg.propagation_mode == "lean" + provider.add_span_processor(RunContextEnricher(lean_mode=lean_mode)) + + exporter = OTLPSpanExporter( + endpoint=traces_endpoint, + headers=cfg.otlp_headers or {}, + ) + provider.add_span_processor( + BatchSpanProcessor( + exporter, + max_export_batch_size=cfg.max_export_batch_size, + max_queue_size=cfg.max_queue_size, + schedule_delay_millis=cfg.schedule_delay_millis, + export_timeout_millis=cfg.export_timeout_millis, + ) + ) - # Propagators (W3C TraceContext + Baggage) - set_global_textmap( - CompositePropagator( - [ - TraceContextTextMapPropagator(), - W3CBaggagePropagator(), - ] + set_global_textmap( + CompositePropagator( + [ + TraceContextTextMapPropagator(), + W3CBaggagePropagator(), + ] + ) ) - ) - logger.info("Botanu SDK tracing initialized") + logger.info("Botanu SDK tracing initialized") - if auto_instrumentation: - _enable_auto_instrumentation() + if auto_instrumentation: + _enable_auto_instrumentation() - _initialized = True - return True + _initialized = True + return True - except Exception as exc: - logger.error("Failed to initialize Botanu SDK: %s", exc, exc_info=True) - return False + except Exception as exc: + logger.error("Failed to initialize Botanu SDK: %s", exc, exc_info=True) + return False def _enable_auto_instrumentation() -> None: @@ -187,45 +208,92 @@ def _enable_auto_instrumentation() -> None: enabled: List[str] = [] failed: List[tuple[str, str]] = [] - # HTTP clients - _try_instrument(enabled, failed, "httpx", "opentelemetry.instrumentation.httpx", "HTTPXClientInstrumentation") + # ── HTTP clients ────────────────────────────────────────────── + _try_instrument(enabled, failed, "httpx", "opentelemetry.instrumentation.httpx", "HTTPXClientInstrumentor") _try_instrument(enabled, failed, "requests", "opentelemetry.instrumentation.requests", "RequestsInstrumentor") _try_instrument(enabled, failed, "urllib3", "opentelemetry.instrumentation.urllib3", "URLLib3Instrumentor") + _try_instrument(enabled, failed, "urllib", "opentelemetry.instrumentation.urllib", "URLLibInstrumentor") _try_instrument( - enabled, failed, "aiohttp", "opentelemetry.instrumentation.aiohttp_client", "AioHttpClientInstrumentor" + enabled, failed, "aiohttp_client", "opentelemetry.instrumentation.aiohttp_client", "AioHttpClientInstrumentor" + ) + _try_instrument( + enabled, failed, "aiohttp_server", "opentelemetry.instrumentation.aiohttp_server", "AioHttpServerInstrumentor" ) - # Web frameworks + # ── Web frameworks ──────────────────────────────────────────── _try_instrument(enabled, failed, "fastapi", "opentelemetry.instrumentation.fastapi", "FastAPIInstrumentor") _try_instrument(enabled, failed, "flask", "opentelemetry.instrumentation.flask", "FlaskInstrumentor") _try_instrument(enabled, failed, "django", "opentelemetry.instrumentation.django", "DjangoInstrumentor") _try_instrument(enabled, failed, "starlette", "opentelemetry.instrumentation.starlette", "StarletteInstrumentor") + _try_instrument(enabled, failed, "falcon", "opentelemetry.instrumentation.falcon", "FalconInstrumentor") + _try_instrument(enabled, failed, "pyramid", "opentelemetry.instrumentation.pyramid", "PyramidInstrumentor") + _try_instrument(enabled, failed, "tornado", "opentelemetry.instrumentation.tornado", "TornadoInstrumentor") - # Databases + # ── Databases ───────────────────────────────────────────────── _try_instrument(enabled, failed, "sqlalchemy", "opentelemetry.instrumentation.sqlalchemy", "SQLAlchemyInstrumentor") _try_instrument(enabled, failed, "psycopg2", "opentelemetry.instrumentation.psycopg2", "Psycopg2Instrumentor") + _try_instrument(enabled, failed, "psycopg", "opentelemetry.instrumentation.psycopg", "PsycopgInstrumentor") _try_instrument(enabled, failed, "asyncpg", "opentelemetry.instrumentation.asyncpg", "AsyncPGInstrumentor") + _try_instrument(enabled, failed, "aiopg", "opentelemetry.instrumentation.aiopg", "AiopgInstrumentor") _try_instrument(enabled, failed, "pymongo", "opentelemetry.instrumentation.pymongo", "PymongoInstrumentor") _try_instrument(enabled, failed, "redis", "opentelemetry.instrumentation.redis", "RedisInstrumentor") + _try_instrument(enabled, failed, "mysql", "opentelemetry.instrumentation.mysql", "MySQLInstrumentor") + _try_instrument( + enabled, failed, "mysqlclient", "opentelemetry.instrumentation.mysqlclient", "MySQLClientInstrumentor" + ) + _try_instrument(enabled, failed, "pymysql", "opentelemetry.instrumentation.pymysql", "PyMySQLInstrumentor") + _try_instrument(enabled, failed, "sqlite3", "opentelemetry.instrumentation.sqlite3", "SQLite3Instrumentor") + _try_instrument( + enabled, failed, "elasticsearch", "opentelemetry.instrumentation.elasticsearch", "ElasticsearchInstrumentor" + ) + _try_instrument(enabled, failed, "cassandra", "opentelemetry.instrumentation.cassandra", "CassandraInstrumentor") + _try_instrument( + enabled, failed, "tortoise_orm", "opentelemetry.instrumentation.tortoiseorm", "TortoiseORMInstrumentor" + ) + + # ── Caching ─────────────────────────────────────────────────── + _try_instrument(enabled, failed, "pymemcache", "opentelemetry.instrumentation.pymemcache", "PymemcacheInstrumentor") - # Messaging + # ── Messaging / Task queues ─────────────────────────────────── _try_instrument(enabled, failed, "celery", "opentelemetry.instrumentation.celery", "CeleryInstrumentor") - _try_instrument(enabled, failed, "kafka", "opentelemetry.instrumentation.kafka", "KafkaInstrumentor") + _try_instrument(enabled, failed, "kafka-python", "opentelemetry.instrumentation.kafka_python", "KafkaInstrumentor") + _try_instrument( + enabled, + failed, + "confluent-kafka", + "opentelemetry.instrumentation.confluent_kafka", + "ConfluentKafkaInstrumentor", + ) + _try_instrument(enabled, failed, "aiokafka", "opentelemetry.instrumentation.aiokafka", "AioKafkaInstrumentor") + _try_instrument(enabled, failed, "pika", "opentelemetry.instrumentation.pika", "PikaInstrumentor") + _try_instrument(enabled, failed, "aio-pika", "opentelemetry.instrumentation.aio_pika", "AioPikaInstrumentor") + + # ── AWS ─────────────────────────────────────────────────────── + _try_instrument(enabled, failed, "botocore", "opentelemetry.instrumentation.botocore", "BotocoreInstrumentor") + _try_instrument(enabled, failed, "boto3sqs", "opentelemetry.instrumentation.boto3sqs", "Boto3SQSInstrumentor") - # gRPC + # ── gRPC ────────────────────────────────────────────────────── _try_instrument_grpc(enabled, failed) - # GenAI / AI + # ── GenAI / AI ──────────────────────────────────────────────── _try_instrument(enabled, failed, "openai", "opentelemetry.instrumentation.openai_v2", "OpenAIInstrumentor") _try_instrument(enabled, failed, "anthropic", "opentelemetry.instrumentation.anthropic", "AnthropicInstrumentor") _try_instrument(enabled, failed, "vertexai", "opentelemetry.instrumentation.vertexai", "VertexAIInstrumentor") _try_instrument( - enabled, failed, "google_genai", "opentelemetry.instrumentation.google_genai", "GoogleGenAiInstrumentor" + enabled, + failed, + "google_genai", + "opentelemetry.instrumentation.google_generativeai", + "GoogleGenerativeAIInstrumentor", ) _try_instrument(enabled, failed, "langchain", "opentelemetry.instrumentation.langchain", "LangchainInstrumentor") + _try_instrument(enabled, failed, "ollama", "opentelemetry.instrumentation.ollama", "OllamaInstrumentor") + _try_instrument(enabled, failed, "crewai", "opentelemetry.instrumentation.crewai", "CrewAIInstrumentor") - # Runtime + # ── Runtime / Concurrency ───────────────────────────────────── _try_instrument(enabled, failed, "logging", "opentelemetry.instrumentation.logging", "LoggingInstrumentor") + _try_instrument(enabled, failed, "threading", "opentelemetry.instrumentation.threading", "ThreadingInstrumentor") + _try_instrument(enabled, failed, "asyncio", "opentelemetry.instrumentation.asyncio", "AsyncioInstrumentor") if enabled: logger.info("Auto-instrumentation enabled: %s", ", ".join(enabled)) @@ -290,20 +358,24 @@ def disable() -> None: Call on application shutdown for clean exit. """ - global _initialized + global _initialized, _current_config - if not _initialized: - return + with _lock: + if not _initialized: + return - try: - from opentelemetry import trace + try: + from opentelemetry import trace - provider = trace.get_tracer_provider() - if hasattr(provider, "shutdown"): - provider.shutdown() + provider = trace.get_tracer_provider() + if hasattr(provider, "force_flush"): + provider.force_flush(timeout_millis=5000) + if hasattr(provider, "shutdown"): + provider.shutdown() - _initialized = False - logger.info("Botanu SDK shutdown complete") + _initialized = False + _current_config = None + logger.info("Botanu SDK shutdown complete") - except Exception as exc: - logger.error("Error during Botanu SDK shutdown: %s", exc) + except Exception as exc: + logger.error("Error during Botanu SDK shutdown: %s", exc) diff --git a/src/botanu/sdk/config.py b/src/botanu/sdk/config.py index c52ffc5..10b6646 100644 --- a/src/botanu/sdk/config.py +++ b/src/botanu/sdk/config.py @@ -35,12 +35,10 @@ class BotanuConfig: The SDK is a thin wrapper on OpenTelemetry. PII redaction, cardinality limits, and vendor enrichment are handled by the OTel Collector — not here. - Example:: + Typically configured via environment variables (no hardcoded values):: - >>> config = BotanuConfig( - ... service_name="my-service", - ... otlp_endpoint="http://collector:4318/v1/traces", - ... ) + >>> # Reads from OTEL_SERVICE_NAME, OTEL_EXPORTER_OTLP_ENDPOINT, etc. + >>> config = BotanuConfig() >>> # Or load from YAML >>> config = BotanuConfig.from_yaml("config/botanu.yaml") @@ -60,9 +58,12 @@ class BotanuConfig: otlp_headers: Optional[Dict[str, str]] = None # Span export configuration + # Large queue prevents span loss under burst traffic. + # At ~1KB/span, 65536 spans ≈ 64MB memory ceiling. max_export_batch_size: int = 512 - max_queue_size: int = 2048 + max_queue_size: int = 65536 schedule_delay_millis: int = 5000 + export_timeout_millis: int = 30000 # Propagation mode: "lean" (run_id + use_case only) or "full" (all context) propagation_mode: str = "lean" @@ -106,9 +107,15 @@ class BotanuConfig: _config_file: Optional[str] = field(default=None, repr=False) def __post_init__(self) -> None: - """Apply environment variable defaults.""" + """Apply environment variable defaults. + + Precedence: BOTANU_* > OTEL_* > defaults + """ if self.service_name is None: - self.service_name = os.getenv("OTEL_SERVICE_NAME", "unknown_service") + self.service_name = os.getenv( + "BOTANU_SERVICE_NAME", + os.getenv("OTEL_SERVICE_NAME", "unknown_service"), + ) if self.service_version is None: self.service_version = os.getenv("OTEL_SERVICE_VERSION") @@ -122,22 +129,49 @@ def __post_init__(self) -> None: if self.deployment_environment is None: self.deployment_environment = os.getenv( - "OTEL_DEPLOYMENT_ENVIRONMENT", - os.getenv("BOTANU_ENVIRONMENT", "production"), + "BOTANU_ENVIRONMENT", + os.getenv("OTEL_DEPLOYMENT_ENVIRONMENT", "production"), ) if self.otlp_endpoint is None: - env_endpoint = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") - if env_endpoint: - self.otlp_endpoint = env_endpoint + # Check BOTANU_COLLECTOR_ENDPOINT first, then OTEL_* vars + botanu_endpoint = os.getenv("BOTANU_COLLECTOR_ENDPOINT") + if botanu_endpoint: + self.otlp_endpoint = botanu_endpoint else: - base = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") - self.otlp_endpoint = f"{base}/v1/traces" + env_endpoint = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") + if env_endpoint: + self.otlp_endpoint = env_endpoint + else: + base = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") + self.otlp_endpoint = base env_propagation_mode = os.getenv("BOTANU_PROPAGATION_MODE") if env_propagation_mode and env_propagation_mode in ("lean", "full"): self.propagation_mode = env_propagation_mode + # Export tuning via env vars + env_queue_size = os.getenv("BOTANU_MAX_QUEUE_SIZE") + if env_queue_size: + try: + self.max_queue_size = int(env_queue_size) + except ValueError: + pass + + env_batch_size = os.getenv("BOTANU_MAX_EXPORT_BATCH_SIZE") + if env_batch_size: + try: + self.max_export_batch_size = int(env_batch_size) + except ValueError: + pass + + env_export_timeout = os.getenv("BOTANU_EXPORT_TIMEOUT_MILLIS") + if env_export_timeout: + try: + self.export_timeout_millis = int(env_export_timeout) + except ValueError: + pass + # ------------------------------------------------------------------ # YAML loading # ------------------------------------------------------------------ @@ -242,8 +276,9 @@ def _from_dict( otlp_endpoint=otlp.get("endpoint"), otlp_headers=otlp.get("headers"), max_export_batch_size=export.get("batch_size", 512), - max_queue_size=export.get("queue_size", 2048), + max_queue_size=export.get("queue_size", 65536), schedule_delay_millis=export.get("delay_ms", 5000), + export_timeout_millis=export.get("export_timeout_ms", 30000), propagation_mode=propagation.get("mode", "lean"), auto_instrument_packages=(auto_packages if auto_packages else BotanuConfig().auto_instrument_packages), _config_file=config_file, @@ -269,6 +304,7 @@ def to_dict(self) -> Dict[str, Any]: "batch_size": self.max_export_batch_size, "queue_size": self.max_queue_size, "delay_ms": self.schedule_delay_millis, + "export_timeout_ms": self.export_timeout_millis, }, "propagation": { "mode": self.propagation_mode, diff --git a/src/botanu/sdk/context.py b/src/botanu/sdk/context.py index 1beaeaf..5a75e3f 100644 --- a/src/botanu/sdk/context.py +++ b/src/botanu/sdk/context.py @@ -20,6 +20,16 @@ def set_baggage(key: str, value: str) -> object: Baggage is automatically propagated across service boundaries via W3C Baggage header. + .. warning:: + + Each call pushes a new context onto the stack. The returned token + **must** be passed to ``opentelemetry.context.detach()`` when the + scope ends, otherwise the context stack grows unboundedly (memory + leak in long-running processes). + + For setting multiple keys, prefer building the context manually + and attaching once — see ``decorators.py`` for the pattern. + Args: key: Baggage key (e.g., ``"botanu.run_id"``). value: Baggage value. diff --git a/src/botanu/sdk/decorators.py b/src/botanu/sdk/decorators.py index 7b490af..4bffa6c 100644 --- a/src/botanu/sdk/decorators.py +++ b/src/botanu/sdk/decorators.py @@ -16,14 +16,17 @@ import functools import hashlib import inspect +from collections.abc import Mapping from datetime import datetime, timezone from typing import Any, Callable, Dict, Optional, TypeVar, Union +from opentelemetry import baggage as otel_baggage from opentelemetry import trace +from opentelemetry.context import attach, detach, get_current from opentelemetry.trace import SpanKind, Status, StatusCode from botanu.models.run_context import RunContext, RunStatus -from botanu.sdk.context import get_baggage, set_baggage +from botanu.sdk.context import get_baggage T = TypeVar("T") @@ -112,14 +115,18 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> T: }, ) + ctx = get_current() for key, value in run_ctx.to_baggage_dict().items(): - set_baggage(key, value) + ctx = otel_baggage.set_baggage(key, value, context=ctx) + baggage_token = attach(ctx) try: result = await func(*args, **kwargs) span_attrs = getattr(span, "attributes", None) - existing_outcome = span_attrs.get("botanu.outcome.status") if isinstance(span_attrs, dict) else None + existing_outcome = ( + span_attrs.get("botanu.outcome.status") if isinstance(span_attrs, Mapping) else None + ) if existing_outcome is None and auto_outcome_on_success: run_ctx.complete(RunStatus.SUCCESS) @@ -139,6 +146,8 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> T: error_class=exc.__class__.__name__, ) raise + finally: + detach(baggage_token) @functools.wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> T: @@ -168,14 +177,18 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> T: }, ) + ctx = get_current() for key, value in run_ctx.to_baggage_dict().items(): - set_baggage(key, value) + ctx = otel_baggage.set_baggage(key, value, context=ctx) + baggage_token = attach(ctx) try: result = func(*args, **kwargs) span_attrs = getattr(span, "attributes", None) - existing_outcome = span_attrs.get("botanu.outcome.status") if isinstance(span_attrs, dict) else None + existing_outcome = ( + span_attrs.get("botanu.outcome.status") if isinstance(span_attrs, Mapping) else None + ) if existing_outcome is None and auto_outcome_on_success: run_ctx.complete(RunStatus.SUCCESS) @@ -195,6 +208,8 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> T: error_class=exc.__class__.__name__, ) raise + finally: + detach(baggage_token) if is_async: return async_wrapper # type: ignore[return-value] @@ -230,7 +245,6 @@ def _emit_run_completed( span.set_attribute("botanu.run.duration_ms", duration_ms) -# Alias use_case = botanu_use_case diff --git a/src/botanu/sdk/middleware.py b/src/botanu/sdk/middleware.py index 78bc987..83eb742 100644 --- a/src/botanu/sdk/middleware.py +++ b/src/botanu/sdk/middleware.py @@ -12,13 +12,13 @@ import uuid from typing import Optional -from opentelemetry import baggage, trace +from opentelemetry import baggage as otel_baggage +from opentelemetry import trace +from opentelemetry.context import attach, detach, get_current from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response -from botanu.sdk.context import set_baggage - class BotanuMiddleware(BaseHTTPMiddleware): """FastAPI middleware to enrich spans with Botanu context. @@ -59,37 +59,44 @@ async def dispatch(self, request: Request, call_next: object) -> Response: # ty """Process request and enrich span with Botanu context.""" span = trace.get_current_span() - # Extract run_id from baggage or headers - run_id = baggage.get_baggage("botanu.run_id") + run_id = otel_baggage.get_baggage("botanu.run_id") if not run_id: run_id = request.headers.get("x-botanu-run-id") if not run_id and self.auto_generate_run_id: run_id = str(uuid.uuid4()) - use_case = baggage.get_baggage("botanu.use_case") or request.headers.get("x-botanu-use-case") or self.use_case - workflow = baggage.get_baggage("botanu.workflow") or request.headers.get("x-botanu-workflow") or self.workflow - customer_id = baggage.get_baggage("botanu.customer_id") or request.headers.get("x-botanu-customer-id") + use_case = ( + otel_baggage.get_baggage("botanu.use_case") or request.headers.get("x-botanu-use-case") or self.use_case + ) + workflow = ( + otel_baggage.get_baggage("botanu.workflow") or request.headers.get("x-botanu-workflow") or self.workflow + ) + customer_id = otel_baggage.get_baggage("botanu.customer_id") or request.headers.get("x-botanu-customer-id") - # Enrich span with Botanu attributes if run_id: span.set_attribute("botanu.run_id", run_id) - set_baggage("botanu.run_id", run_id) - span.set_attribute("botanu.use_case", use_case) - set_baggage("botanu.use_case", use_case) - span.set_attribute("botanu.workflow", workflow) - set_baggage("botanu.workflow", workflow) - if customer_id: span.set_attribute("botanu.customer_id", customer_id) - set_baggage("botanu.customer_id", customer_id) span.set_attribute("http.route", request.url.path) span.set_attribute("http.method", request.method) - response = await call_next(request) # type: ignore[misc] + ctx = get_current() + if run_id: + ctx = otel_baggage.set_baggage("botanu.run_id", run_id, context=ctx) + ctx = otel_baggage.set_baggage("botanu.use_case", use_case, context=ctx) + ctx = otel_baggage.set_baggage("botanu.workflow", workflow, context=ctx) + if customer_id: + ctx = otel_baggage.set_baggage("botanu.customer_id", customer_id, context=ctx) + + baggage_token = attach(ctx) + try: + response = await call_next(request) # type: ignore[misc] + finally: + detach(baggage_token) if run_id: response.headers["x-botanu-run-id"] = run_id diff --git a/src/botanu/sdk/span_helpers.py b/src/botanu/sdk/span_helpers.py index 98eaffd..f7388ff 100644 --- a/src/botanu/sdk/span_helpers.py +++ b/src/botanu/sdk/span_helpers.py @@ -53,7 +53,6 @@ def emit_outcome( if reason: span.set_attribute("botanu.outcome.reason", reason) - # Add span event for timeline visibility event_attrs: dict[str, object] = {"status": status} if value_type: event_attrs["value_type"] = value_type diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e6ae60f --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/conftest.py b/tests/conftest.py index 288f918..202e424 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,6 +10,7 @@ from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace.sampling import ALWAYS_ON # Module-level provider and exporter to avoid "cannot override" warnings _provider: TracerProvider = None @@ -21,7 +22,7 @@ def _get_or_create_provider() -> tuple[TracerProvider, InMemorySpanExporter]: global _provider, _exporter if _provider is None: - _provider = TracerProvider() + _provider = TracerProvider(sampler=ALWAYS_ON) _exporter = InMemorySpanExporter() _provider.add_span_processor(SimpleSpanProcessor(_exporter)) trace.set_tracer_provider(_provider) diff --git a/tests/unit/test_bootstrap.py b/tests/unit/test_bootstrap.py new file mode 100644 index 0000000..ac0a2c9 --- /dev/null +++ b/tests/unit/test_bootstrap.py @@ -0,0 +1,670 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for bootstrap module — enable(), auto-instrumentation, and config env var precedence.""" + +from __future__ import annotations + +import os +from unittest import mock + +from botanu.sdk.config import BotanuConfig + +# --------------------------------------------------------------------------- +# Config env-var precedence: BOTANU_* > OTEL_* > defaults +# --------------------------------------------------------------------------- + + +class TestConfigBotanuEnvPrecedence: + """BOTANU_* env vars take precedence over OTEL_* equivalents.""" + + def test_botanu_service_name_over_otel(self): + env = { + "BOTANU_SERVICE_NAME": "botanu-svc", + "OTEL_SERVICE_NAME": "otel-svc", + } + with mock.patch.dict(os.environ, env, clear=False): + cfg = BotanuConfig() + assert cfg.service_name == "botanu-svc" + + def test_otel_service_name_fallback(self): + env = {"OTEL_SERVICE_NAME": "otel-svc"} + with mock.patch.dict(os.environ, env, clear=False): + for key in ["BOTANU_SERVICE_NAME"]: + os.environ.pop(key, None) + cfg = BotanuConfig() + assert cfg.service_name == "otel-svc" + + def test_service_name_default(self): + with mock.patch.dict(os.environ, {}, clear=True): + cfg = BotanuConfig() + assert cfg.service_name == "unknown_service" + + def test_botanu_collector_endpoint_over_otel(self): + env = { + "BOTANU_COLLECTOR_ENDPOINT": "http://botanu-collector:4318", + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://otel-collector:4318", + } + with mock.patch.dict(os.environ, env, clear=False): + cfg = BotanuConfig() + assert cfg.otlp_endpoint == "http://botanu-collector:4318" + + def test_otel_exporter_endpoint_fallback(self): + env = {"OTEL_EXPORTER_OTLP_ENDPOINT": "http://otel-collector:4318"} + with mock.patch.dict(os.environ, env, clear=False): + for key in ["BOTANU_COLLECTOR_ENDPOINT", "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"]: + os.environ.pop(key, None) + cfg = BotanuConfig() + assert cfg.otlp_endpoint == "http://otel-collector:4318" + + def test_otel_traces_endpoint_over_base_endpoint(self): + env = { + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "http://traces:4318/v1/traces", + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://base:4318", + } + with mock.patch.dict(os.environ, env, clear=False): + for key in ["BOTANU_COLLECTOR_ENDPOINT"]: + os.environ.pop(key, None) + cfg = BotanuConfig() + assert cfg.otlp_endpoint == "http://traces:4318/v1/traces" + + def test_endpoint_default_localhost(self): + with mock.patch.dict(os.environ, {}, clear=True): + cfg = BotanuConfig() + assert cfg.otlp_endpoint == "http://localhost:4318" + + def test_botanu_environment_over_otel(self): + env = { + "BOTANU_ENVIRONMENT": "botanu-staging", + "OTEL_DEPLOYMENT_ENVIRONMENT": "otel-prod", + } + with mock.patch.dict(os.environ, env, clear=False): + cfg = BotanuConfig() + assert cfg.deployment_environment == "botanu-staging" + + def test_otel_deployment_environment_fallback(self): + env = {"OTEL_DEPLOYMENT_ENVIRONMENT": "otel-prod"} + with mock.patch.dict(os.environ, env, clear=False): + for key in ["BOTANU_ENVIRONMENT"]: + os.environ.pop(key, None) + cfg = BotanuConfig() + assert cfg.deployment_environment == "otel-prod" + + def test_environment_default_production(self): + with mock.patch.dict(os.environ, {}, clear=True): + cfg = BotanuConfig() + assert cfg.deployment_environment == "production" + + def test_explicit_args_override_all_env(self): + env = { + "BOTANU_SERVICE_NAME": "env-name", + "BOTANU_COLLECTOR_ENDPOINT": "http://env:4318", + "BOTANU_ENVIRONMENT": "env-staging", + } + with mock.patch.dict(os.environ, env, clear=False): + cfg = BotanuConfig( + service_name="explicit-name", + otlp_endpoint="http://explicit:4318", + deployment_environment="explicit-staging", + ) + assert cfg.service_name == "explicit-name" + assert cfg.otlp_endpoint == "http://explicit:4318" + assert cfg.deployment_environment == "explicit-staging" + + +# --------------------------------------------------------------------------- +# Config: propagation mode +# --------------------------------------------------------------------------- + + +class TestConfigPropagationMode: + """Tests for propagation mode configuration.""" + + def test_default_lean(self): + with mock.patch.dict(os.environ, {}, clear=True): + cfg = BotanuConfig() + assert cfg.propagation_mode == "lean" + + def test_env_var_full(self): + with mock.patch.dict(os.environ, {"BOTANU_PROPAGATION_MODE": "full"}): + cfg = BotanuConfig() + assert cfg.propagation_mode == "full" + + def test_env_var_lean(self): + with mock.patch.dict(os.environ, {"BOTANU_PROPAGATION_MODE": "lean"}): + cfg = BotanuConfig() + assert cfg.propagation_mode == "lean" + + def test_invalid_propagation_mode_ignored(self): + with mock.patch.dict(os.environ, {"BOTANU_PROPAGATION_MODE": "invalid"}): + cfg = BotanuConfig() + assert cfg.propagation_mode == "lean" + + +# --------------------------------------------------------------------------- +# Config: auto-detect resources +# --------------------------------------------------------------------------- + + +class TestConfigAutoDetectResources: + """Tests for auto-detect resources toggle.""" + + def test_default_true(self): + with mock.patch.dict(os.environ, {}, clear=True): + cfg = BotanuConfig() + assert cfg.auto_detect_resources is True + + def test_env_disable(self): + with mock.patch.dict(os.environ, {"BOTANU_AUTO_DETECT_RESOURCES": "false"}): + cfg = BotanuConfig() + assert cfg.auto_detect_resources is False + + def test_env_enable(self): + with mock.patch.dict(os.environ, {"BOTANU_AUTO_DETECT_RESOURCES": "true"}): + cfg = BotanuConfig() + assert cfg.auto_detect_resources is True + + def test_env_numeric(self): + with mock.patch.dict(os.environ, {"BOTANU_AUTO_DETECT_RESOURCES": "0"}): + cfg = BotanuConfig() + assert cfg.auto_detect_resources is False + + +# --------------------------------------------------------------------------- +# Bootstrap: auto-instrumentation coverage +# --------------------------------------------------------------------------- + + +class TestAutoInstrumentationCoverage: + """Verify all expected instrumentations are wired in _enable_auto_instrumentation.""" + + def _get_instrumentation_names(self) -> list[str]: + """Extract all instrumentation names from the bootstrap source.""" + import ast + import inspect + + from botanu.sdk.bootstrap import _enable_auto_instrumentation + + source = inspect.getsource(_enable_auto_instrumentation) + names: list[str] = [] + # Parse all _try_instrument calls and extract the 'name' argument + tree = ast.parse(source) + for node in ast.walk(tree): + if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): + if node.func.id == "_try_instrument" and len(node.args) >= 3: + name_arg = node.args[2] + if isinstance(name_arg, ast.Constant): + names.append(name_arg.value) + return names + + # ── HTTP clients ────────────────────────────────────────────── + + def test_httpx_instrumented(self): + assert "httpx" in self._get_instrumentation_names() + + def test_requests_instrumented(self): + assert "requests" in self._get_instrumentation_names() + + def test_urllib3_instrumented(self): + assert "urllib3" in self._get_instrumentation_names() + + def test_urllib_instrumented(self): + assert "urllib" in self._get_instrumentation_names() + + def test_aiohttp_client_instrumented(self): + assert "aiohttp_client" in self._get_instrumentation_names() + + def test_aiohttp_server_instrumented(self): + assert "aiohttp_server" in self._get_instrumentation_names() + + # ── Web frameworks ──────────────────────────────────────────── + + def test_fastapi_instrumented(self): + assert "fastapi" in self._get_instrumentation_names() + + def test_flask_instrumented(self): + assert "flask" in self._get_instrumentation_names() + + def test_django_instrumented(self): + assert "django" in self._get_instrumentation_names() + + def test_starlette_instrumented(self): + assert "starlette" in self._get_instrumentation_names() + + def test_falcon_instrumented(self): + assert "falcon" in self._get_instrumentation_names() + + def test_pyramid_instrumented(self): + assert "pyramid" in self._get_instrumentation_names() + + def test_tornado_instrumented(self): + assert "tornado" in self._get_instrumentation_names() + + # ── Databases ───────────────────────────────────────────────── + + def test_sqlalchemy_instrumented(self): + assert "sqlalchemy" in self._get_instrumentation_names() + + def test_psycopg2_instrumented(self): + assert "psycopg2" in self._get_instrumentation_names() + + def test_psycopg_instrumented(self): + assert "psycopg" in self._get_instrumentation_names() + + def test_asyncpg_instrumented(self): + assert "asyncpg" in self._get_instrumentation_names() + + def test_aiopg_instrumented(self): + assert "aiopg" in self._get_instrumentation_names() + + def test_pymongo_instrumented(self): + assert "pymongo" in self._get_instrumentation_names() + + def test_redis_instrumented(self): + assert "redis" in self._get_instrumentation_names() + + def test_mysql_instrumented(self): + assert "mysql" in self._get_instrumentation_names() + + def test_pymysql_instrumented(self): + assert "pymysql" in self._get_instrumentation_names() + + def test_sqlite3_instrumented(self): + assert "sqlite3" in self._get_instrumentation_names() + + def test_elasticsearch_instrumented(self): + assert "elasticsearch" in self._get_instrumentation_names() + + def test_cassandra_instrumented(self): + assert "cassandra" in self._get_instrumentation_names() + + # ── Caching ─────────────────────────────────────────────────── + + def test_pymemcache_instrumented(self): + assert "pymemcache" in self._get_instrumentation_names() + + # ── Messaging ───────────────────────────────────────────────── + + def test_celery_instrumented(self): + assert "celery" in self._get_instrumentation_names() + + def test_kafka_python_instrumented(self): + assert "kafka-python" in self._get_instrumentation_names() + + def test_confluent_kafka_instrumented(self): + assert "confluent-kafka" in self._get_instrumentation_names() + + def test_aiokafka_instrumented(self): + assert "aiokafka" in self._get_instrumentation_names() + + def test_pika_instrumented(self): + assert "pika" in self._get_instrumentation_names() + + def test_aio_pika_instrumented(self): + assert "aio-pika" in self._get_instrumentation_names() + + # ── AWS ─────────────────────────────────────────────────────── + + def test_botocore_instrumented(self): + assert "botocore" in self._get_instrumentation_names() + + def test_boto3sqs_instrumented(self): + assert "boto3sqs" in self._get_instrumentation_names() + + # ── GenAI / AI ──────────────────────────────────────────────── + + def test_openai_instrumented(self): + assert "openai" in self._get_instrumentation_names() + + def test_anthropic_instrumented(self): + assert "anthropic" in self._get_instrumentation_names() + + def test_vertexai_instrumented(self): + assert "vertexai" in self._get_instrumentation_names() + + def test_google_genai_instrumented(self): + assert "google_genai" in self._get_instrumentation_names() + + def test_langchain_instrumented(self): + assert "langchain" in self._get_instrumentation_names() + + def test_ollama_instrumented(self): + assert "ollama" in self._get_instrumentation_names() + + def test_crewai_instrumented(self): + assert "crewai" in self._get_instrumentation_names() + + # ── Runtime ─────────────────────────────────────────────────── + + def test_logging_instrumented(self): + assert "logging" in self._get_instrumentation_names() + + def test_threading_instrumented(self): + assert "threading" in self._get_instrumentation_names() + + def test_asyncio_instrumented(self): + assert "asyncio" in self._get_instrumentation_names() + + +# --------------------------------------------------------------------------- +# Bootstrap: _try_instrument resilience +# --------------------------------------------------------------------------- + + +class TestTryInstrument: + """Tests for _try_instrument helper function.""" + + def test_missing_package_silently_skipped(self): + from botanu.sdk.bootstrap import _try_instrument + + enabled: list[str] = [] + failed: list[tuple[str, str]] = [] + _try_instrument(enabled, failed, "nonexistent", "nonexistent.module", "FooInstrumentor") + assert enabled == [] + assert failed == [] + + def test_instrument_error_recorded(self): + from botanu.sdk.bootstrap import _try_instrument + + enabled: list[str] = [] + failed: list[tuple[str, str]] = [] + # os module exists but has no 'FooInstrumentor' class + _try_instrument(enabled, failed, "os_fake", "os", "FooInstrumentor") + assert enabled == [] + assert len(failed) == 1 + assert failed[0][0] == "os_fake" + + +# --------------------------------------------------------------------------- +# Bootstrap: enable() / disable() / is_enabled() +# --------------------------------------------------------------------------- + + +class TestEnableDisable: + """Tests for bootstrap enable/disable lifecycle.""" + + def test_is_enabled_initially_false(self): + from botanu.sdk import bootstrap + + # Save and reset state + original = bootstrap._initialized + bootstrap._initialized = False + try: + assert bootstrap.is_enabled() is False + finally: + bootstrap._initialized = original + + def test_get_config_returns_none_when_not_initialized(self): + from botanu.sdk import bootstrap + + original_init = bootstrap._initialized + original_cfg = bootstrap._current_config + bootstrap._initialized = False + bootstrap._current_config = None + try: + assert bootstrap.get_config() is None + finally: + bootstrap._initialized = original_init + bootstrap._current_config = original_cfg + + +# --------------------------------------------------------------------------- +# Bootstrap: endpoint normalization in bootstrap +# --------------------------------------------------------------------------- + + +class TestEndpointNormalization: + """Verify bootstrap appends /v1/traces when needed.""" + + def test_base_endpoint_gets_v1_traces_appended(self): + """Config stores base URL; bootstrap should append /v1/traces.""" + with mock.patch.dict(os.environ, {"OTEL_EXPORTER_OTLP_ENDPOINT": "http://collector:4318"}, clear=True): + cfg = BotanuConfig() + assert cfg.otlp_endpoint == "http://collector:4318" + + # Simulate what bootstrap does + traces_endpoint = cfg.otlp_endpoint + if traces_endpoint and not traces_endpoint.endswith("/v1/traces"): + traces_endpoint = f"{traces_endpoint.rstrip('/')}/v1/traces" + assert traces_endpoint == "http://collector:4318/v1/traces" + + def test_traces_endpoint_not_doubled(self): + """If already ends with /v1/traces, don't append again.""" + with mock.patch.dict( + os.environ, + {"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "http://collector:4318/v1/traces"}, + clear=True, + ): + cfg = BotanuConfig() + assert cfg.otlp_endpoint == "http://collector:4318/v1/traces" + + traces_endpoint = cfg.otlp_endpoint + if traces_endpoint and not traces_endpoint.endswith("/v1/traces"): + traces_endpoint = f"{traces_endpoint.rstrip('/')}/v1/traces" + assert traces_endpoint == "http://collector:4318/v1/traces" + + def test_botanu_endpoint_gets_v1_traces_appended(self): + """BOTANU_COLLECTOR_ENDPOINT also gets /v1/traces appended by bootstrap.""" + with mock.patch.dict( + os.environ, + {"BOTANU_COLLECTOR_ENDPOINT": "http://my-collector:4318"}, + clear=True, + ): + cfg = BotanuConfig() + assert cfg.otlp_endpoint == "http://my-collector:4318" + + traces_endpoint = cfg.otlp_endpoint + if traces_endpoint and not traces_endpoint.endswith("/v1/traces"): + traces_endpoint = f"{traces_endpoint.rstrip('/')}/v1/traces" + assert traces_endpoint == "http://my-collector:4318/v1/traces" + + def test_trailing_slash_handled(self): + """Trailing slash on base endpoint should not cause double slash.""" + with mock.patch.dict( + os.environ, + {"OTEL_EXPORTER_OTLP_ENDPOINT": "http://collector:4318/"}, + clear=True, + ): + cfg = BotanuConfig() + traces_endpoint = cfg.otlp_endpoint + if traces_endpoint and not traces_endpoint.endswith("/v1/traces"): + traces_endpoint = f"{traces_endpoint.rstrip('/')}/v1/traces" + assert traces_endpoint == "http://collector:4318/v1/traces" + + +# --------------------------------------------------------------------------- +# Bootstrap: thread safety +# --------------------------------------------------------------------------- + + +class TestBootstrapThreadSafety: + """Verify that enable() is thread-safe.""" + + def test_lock_exists(self): + """Bootstrap module must have a threading lock.""" + import threading + + from botanu.sdk import bootstrap + + assert hasattr(bootstrap, "_lock") + assert isinstance(bootstrap._lock, type(threading.RLock())) + + def test_concurrent_enable_only_initializes_once(self): + """Multiple threads calling enable() simultaneously should not race.""" + import threading + + from botanu.sdk import bootstrap + + # Reset state + original_init = bootstrap._initialized + original_cfg = bootstrap._current_config + bootstrap._initialized = False + bootstrap._current_config = None + + results = [] + barrier = threading.Barrier(5) + + def call_enable(): + barrier.wait() + try: + result = bootstrap.enable( + service_name="thread-test", + otlp_endpoint="http://localhost:4318", + auto_instrumentation=False, + ) + results.append(result) + except Exception: + results.append(None) + + try: + threads = [threading.Thread(target=call_enable) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + + # Exactly one thread should get True (first to init), rest get False + true_count = sum(1 for r in results if r is True) + false_count = sum(1 for r in results if r is False) + assert true_count == 1, f"Expected exactly 1 True, got {true_count}" + assert false_count == 4, f"Expected 4 False, got {false_count}" + finally: + bootstrap._initialized = original_init + bootstrap._current_config = original_cfg + + +# --------------------------------------------------------------------------- +# Bootstrap: full lifecycle +# --------------------------------------------------------------------------- + + +class TestBootstrapLifecycle: + """Tests for enable/disable full lifecycle.""" + + def test_disable_when_not_initialized_is_noop(self): + from botanu.sdk import bootstrap + + original = bootstrap._initialized + bootstrap._initialized = False + try: + bootstrap.disable() # Should not raise + finally: + bootstrap._initialized = original + + def test_disable_clears_config(self): + from botanu.sdk import bootstrap + + original_init = bootstrap._initialized + original_cfg = bootstrap._current_config + + bootstrap._initialized = True + bootstrap._current_config = BotanuConfig(service_name="test") + + try: + # Mock the tracer provider to avoid shutting down the real test provider + mock_provider = mock.MagicMock() + with mock.patch("opentelemetry.trace.get_tracer_provider", return_value=mock_provider): + bootstrap.disable() + assert bootstrap._current_config is None + assert bootstrap._initialized is False + mock_provider.force_flush.assert_called_once() + mock_provider.shutdown.assert_called_once() + finally: + bootstrap._initialized = original_init + bootstrap._current_config = original_cfg + + def test_is_enabled_reflects_state(self): + from botanu.sdk import bootstrap + + original = bootstrap._initialized + + try: + bootstrap._initialized = True + assert bootstrap.is_enabled() is True + bootstrap._initialized = False + assert bootstrap.is_enabled() is False + finally: + bootstrap._initialized = original + + def test_get_config_returns_config_when_set(self): + from botanu.sdk import bootstrap + + original_init = bootstrap._initialized + original_cfg = bootstrap._current_config + + test_cfg = BotanuConfig(service_name="my-svc") + bootstrap._current_config = test_cfg + + try: + assert bootstrap.get_config() is test_cfg + finally: + bootstrap._initialized = original_init + bootstrap._current_config = original_cfg + + +# --------------------------------------------------------------------------- +# Bootstrap: no-sampling guarantee +# --------------------------------------------------------------------------- + + +class TestNoSamplingGuarantee: + """Botanu NEVER samples or drops spans.""" + + def test_always_on_sampler_in_bootstrap(self): + """Bootstrap source must use ALWAYS_ON sampler explicitly.""" + import inspect + + from botanu.sdk import bootstrap + + source = inspect.getsource(bootstrap.enable) + assert "ALWAYS_ON" in source, "enable() must use ALWAYS_ON sampler" + assert "sampler=ALWAYS_ON" in source, "TracerProvider must have sampler=ALWAYS_ON" + + def test_no_sampling_imports_in_codebase(self): + """SDK must never import ratio or parent-based samplers.""" + import inspect + + from botanu.sdk import bootstrap + + source = inspect.getsource(bootstrap) + # These samplers would enable span dropping + assert "TraceIdRatio" not in source + assert "ParentBased" not in source + assert "ALWAYS_OFF" not in source + + def test_otel_traces_sampler_env_var_warning(self): + """Setting OTEL_TRACES_SAMPLER should log a warning, not enable sampling.""" + import inspect + + from botanu.sdk import bootstrap + + source = inspect.getsource(bootstrap.enable) + assert "OTEL_TRACES_SAMPLER" in source, "enable() must check for OTEL_TRACES_SAMPLER env var and warn" + + def test_conftest_uses_always_on(self): + """Test provider must also use ALWAYS_ON to match production behavior.""" + from opentelemetry.sdk.trace.sampling import ALWAYS_ON + + from tests.conftest import _get_or_create_provider + + provider, _ = _get_or_create_provider() + assert provider.sampler is ALWAYS_ON + + +# --------------------------------------------------------------------------- +# Bootstrap: provider reuse (no double-spanning) +# --------------------------------------------------------------------------- + + +class TestProviderReuse: + """Botanu must not create a second TracerProvider if one already exists.""" + + def test_reuse_existing_provider_code_path(self): + """Bootstrap source must check for existing TracerProvider.""" + import inspect + + from botanu.sdk import bootstrap + + source = inspect.getsource(bootstrap.enable) + assert "get_tracer_provider" in source, "enable() must check for existing TracerProvider" + assert "isinstance" in source, "enable() must use isinstance to check provider type" diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 117d75b..88eb9cb 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -66,11 +66,11 @@ def test_env_var_environment(self): assert config.deployment_environment == "staging" def test_env_var_otlp_endpoint_base(self): - """OTEL_EXPORTER_OTLP_ENDPOINT gets /v1/traces appended.""" + """OTEL_EXPORTER_OTLP_ENDPOINT is stored as base; bootstrap appends /v1/traces.""" with mock.patch.dict(os.environ, {"OTEL_EXPORTER_OTLP_ENDPOINT": "http://collector:4318"}): config = BotanuConfig() - # Base endpoint gets /v1/traces appended - assert config.otlp_endpoint == "http://collector:4318/v1/traces" + # Base endpoint stored as-is; bootstrap.py appends /v1/traces + assert config.otlp_endpoint == "http://collector:4318" def test_env_var_otlp_traces_endpoint_direct(self): """OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is used directly without appending.""" @@ -189,6 +189,163 @@ def test_to_dict(self): assert d["otlp"]["endpoint"] == "http://localhost:4318" +class TestBotanuConfigExportTuning: + """Tests for export tuning env vars (queue, batch, timeout).""" + + def test_default_export_values(self): + with mock.patch.dict(os.environ, {}, clear=True): + for key in ["BOTANU_MAX_QUEUE_SIZE", "BOTANU_MAX_EXPORT_BATCH_SIZE", "BOTANU_EXPORT_TIMEOUT_MILLIS"]: + os.environ.pop(key, None) + config = BotanuConfig() + assert config.max_queue_size == 65536 + assert config.max_export_batch_size == 512 + assert config.export_timeout_millis == 30000 + + def test_env_var_max_queue_size(self): + with mock.patch.dict(os.environ, {"BOTANU_MAX_QUEUE_SIZE": "131072"}): + config = BotanuConfig() + assert config.max_queue_size == 131072 + + def test_env_var_max_export_batch_size(self): + with mock.patch.dict(os.environ, {"BOTANU_MAX_EXPORT_BATCH_SIZE": "1024"}): + config = BotanuConfig() + assert config.max_export_batch_size == 1024 + + def test_env_var_export_timeout_millis(self): + with mock.patch.dict(os.environ, {"BOTANU_EXPORT_TIMEOUT_MILLIS": "60000"}): + config = BotanuConfig() + assert config.export_timeout_millis == 60000 + + def test_invalid_queue_size_ignored(self): + with mock.patch.dict(os.environ, {"BOTANU_MAX_QUEUE_SIZE": "not_a_number"}): + config = BotanuConfig() + assert config.max_queue_size == 65536 + + def test_invalid_batch_size_ignored(self): + with mock.patch.dict(os.environ, {"BOTANU_MAX_EXPORT_BATCH_SIZE": "bad"}): + config = BotanuConfig() + assert config.max_export_batch_size == 512 + + def test_invalid_timeout_ignored(self): + with mock.patch.dict(os.environ, {"BOTANU_EXPORT_TIMEOUT_MILLIS": "abc"}): + config = BotanuConfig() + assert config.export_timeout_millis == 30000 + + +class TestBotanuConfigFromYamlExport: + """Tests for YAML export configuration parsing.""" + + def test_from_yaml_with_export_config(self, tmp_path): + yaml_content = """ +service: + name: yaml-export-test +export: + batch_size: 256 + queue_size: 32768 + delay_ms: 2000 + export_timeout_ms: 15000 +""" + yaml_file = tmp_path / "config.yaml" + yaml_file.write_text(yaml_content) + + config = BotanuConfig.from_yaml(str(yaml_file)) + assert config.max_export_batch_size == 256 + assert config.max_queue_size == 32768 + assert config.schedule_delay_millis == 2000 + assert config.export_timeout_millis == 15000 + + def test_from_yaml_export_defaults(self, tmp_path): + """YAML without export section uses defaults.""" + yaml_content = """ +service: + name: minimal +""" + yaml_file = tmp_path / "config.yaml" + yaml_file.write_text(yaml_content) + + config = BotanuConfig.from_yaml(str(yaml_file)) + assert config.max_export_batch_size == 512 + assert config.max_queue_size == 65536 + assert config.export_timeout_millis == 30000 + + +class TestBotanuConfigToDictExport: + """Tests for to_dict roundtrip with export params.""" + + def test_to_dict_includes_export_timeout(self): + config = BotanuConfig(export_timeout_millis=45000) + d = config.to_dict() + assert d["export"]["export_timeout_ms"] == 45000 + + def test_to_dict_roundtrip(self, tmp_path): + """to_dict output should be loadable by _from_dict.""" + original = BotanuConfig( + service_name="roundtrip", + max_queue_size=4096, + max_export_batch_size=128, + export_timeout_millis=10000, + ) + d = original.to_dict() + d["auto_instrument_packages"] = original.auto_instrument_packages + restored = BotanuConfig._from_dict(d) + assert restored.max_queue_size == 4096 + assert restored.max_export_batch_size == 128 + assert restored.export_timeout_millis == 10000 + + +class TestBotanuConfigPrecedence: + """Tests for BOTANU_* > OTEL_* > default precedence.""" + + def test_botanu_service_name_over_otel(self): + with mock.patch.dict( + os.environ, + { + "BOTANU_SERVICE_NAME": "botanu-svc", + "OTEL_SERVICE_NAME": "otel-svc", + }, + ): + config = BotanuConfig() + assert config.service_name == "botanu-svc" + + def test_botanu_endpoint_over_otel(self): + with mock.patch.dict( + os.environ, + { + "BOTANU_COLLECTOR_ENDPOINT": "http://botanu:4318", + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://otel:4318", + }, + ): + config = BotanuConfig() + assert config.otlp_endpoint == "http://botanu:4318" + + def test_botanu_environment_over_otel(self): + with mock.patch.dict( + os.environ, + { + "BOTANU_ENVIRONMENT": "staging", + "OTEL_DEPLOYMENT_ENVIRONMENT": "production", + }, + ): + config = BotanuConfig() + assert config.deployment_environment == "staging" + + def test_propagation_mode_rejects_invalid(self): + with mock.patch.dict(os.environ, {"BOTANU_PROPAGATION_MODE": "invalid"}): + config = BotanuConfig() + assert config.propagation_mode == "lean" + + def test_auto_detect_resources_env_false(self): + with mock.patch.dict(os.environ, {"BOTANU_AUTO_DETECT_RESOURCES": "false"}): + config = BotanuConfig() + assert config.auto_detect_resources is False + + def test_auto_detect_resources_truthy_values(self): + for truthy in ("true", "1", "yes"): + with mock.patch.dict(os.environ, {"BOTANU_AUTO_DETECT_RESOURCES": truthy}): + config = BotanuConfig() + assert config.auto_detect_resources is True + + class TestBotanuConfigAutoInstrument: """Tests for auto-instrumentation configuration.""" diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index 77f7ded..91feca4 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -12,6 +12,7 @@ get_current_span, get_run_id, get_use_case, + get_workflow, set_baggage, ) @@ -45,6 +46,14 @@ def test_get_use_case(self): set_baggage("botanu.use_case", "Customer Support") assert get_use_case() == "Customer Support" + def test_get_workflow(self): + set_baggage("botanu.workflow", "ticket_handler") + assert get_workflow() == "ticket_handler" + + def test_get_workflow_not_set(self): + result = get_workflow() + assert result is None or isinstance(result, str) + class TestSpanHelpers: """Tests for span helper functions.""" @@ -61,3 +70,38 @@ def test_get_current_span_no_active_span(self): assert span is not None # Non-recording spans have is_recording() == False assert not span.is_recording() + + +class TestSetBaggageTokenManagement: + """Tests for set_baggage token lifecycle and context management.""" + + def test_set_baggage_returns_detachable_token(self): + from opentelemetry.context import detach + + token = set_baggage("botanu.token_test", "val1") + assert token is not None + assert get_baggage("botanu.token_test") == "val1" + detach(token) + + def test_multiple_set_baggage_stacks_values(self): + token1 = set_baggage("botanu.stack_a", "a") + token2 = set_baggage("botanu.stack_b", "b") + + assert get_baggage("botanu.stack_a") == "a" + assert get_baggage("botanu.stack_b") == "b" + assert token1 is not None + assert token2 is not None + + def test_overwrite_same_key(self): + set_baggage("botanu.overwrite", "first") + set_baggage("botanu.overwrite", "second") + assert get_baggage("botanu.overwrite") == "second" + + def test_get_baggage_returns_none_in_clean_context(self): + from opentelemetry import context as otel_context + + token = otel_context.attach(otel_context.Context()) + try: + assert get_baggage("botanu.surely_missing") is None + finally: + otel_context.detach(token) diff --git a/tests/unit/test_data_tracking.py b/tests/unit/test_data_tracking.py index 3c6680e..a9990be 100644 --- a/tests/unit/test_data_tracking.py +++ b/tests/unit/test_data_tracking.py @@ -207,3 +207,267 @@ def test_messaging_operations(self): assert MessagingOperation.PUBLISH == "publish" assert MessagingOperation.RECEIVE == "receive" assert MessagingOperation.CONSUME == "consume" + + +class TestSystemNormalization: + """Tests for system name normalization maps.""" + + def test_db_system_aliases(self, memory_exporter): + from botanu.tracking.data import DB_SYSTEMS + + assert DB_SYSTEMS["postgres"] == "postgresql" + assert DB_SYSTEMS["pg"] == "postgresql" + assert DB_SYSTEMS["mongo"] == "mongodb" + assert DB_SYSTEMS["sqlserver"] == "mssql" + + def test_storage_system_aliases(self): + from botanu.tracking.data import STORAGE_SYSTEMS + + assert STORAGE_SYSTEMS["aws_s3"] == "s3" + assert STORAGE_SYSTEMS["google_cloud_storage"] == "gcs" + assert STORAGE_SYSTEMS["blob"] == "azure_blob" + + def test_messaging_system_aliases(self): + from botanu.tracking.data import MESSAGING_SYSTEMS + + assert MESSAGING_SYSTEMS["aws_sqs"] == "sqs" + assert MESSAGING_SYSTEMS["google_pubsub"] == "pubsub" + assert MESSAGING_SYSTEMS["azure_servicebus"] == "servicebus" + + def test_db_alias_used_in_span(self, memory_exporter): + """Alias 'pg' should normalize to 'postgresql' in the span.""" + with track_db_operation(system="pg", operation="SELECT") as tracker: + tracker.set_result(rows_returned=1) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["db.system"] == "postgresql" + + def test_unknown_system_passthrough(self, memory_exporter): + """Unknown systems should pass through as lowercase.""" + with track_db_operation(system="CockroachDB", operation="SELECT"): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["db.system"] == "cockroachdb" + + +class TestDBTrackerMetadata: + """Tests for DBTracker.add_metadata and set_bytes_scanned.""" + + def test_add_metadata(self, memory_exporter): + with track_db_operation(system="postgresql", operation="SELECT") as tracker: + tracker.add_metadata(query_plan="seq_scan", cost_estimate=42.5) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.data.query_plan"] == "seq_scan" + assert attrs["botanu.data.cost_estimate"] == 42.5 + + def test_add_metadata_preserves_botanu_prefix(self, memory_exporter): + with track_db_operation(system="postgresql", operation="SELECT") as tracker: + tracker.add_metadata(**{"botanu.custom_key": "custom_val"}) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.custom_key"] == "custom_val" + + def test_set_bytes_scanned(self, memory_exporter): + with track_db_operation(system="bigquery", operation="SELECT") as tracker: + tracker.set_bytes_scanned(5_000_000) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.warehouse.bytes_scanned"] == 5_000_000 + assert tracker.bytes_read == 5_000_000 + + def test_duration_finalized(self, memory_exporter): + with track_db_operation(system="postgresql", operation="INSERT"): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert "botanu.data.duration_ms" in attrs + assert attrs["botanu.data.duration_ms"] >= 0 + + +class TestStorageTrackerMetadata: + """Tests for StorageTracker.add_metadata.""" + + def test_add_metadata(self, memory_exporter): + with track_storage_operation(system="s3", operation="PUT") as tracker: + tracker.add_metadata(content_type="application/json", region="us-east-1") + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.storage.content_type"] == "application/json" + assert attrs["botanu.storage.region"] == "us-east-1" + + def test_duration_finalized(self, memory_exporter): + with track_storage_operation(system="gcs", operation="GET"): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert "botanu.storage.duration_ms" in attrs + + +class TestMessagingTrackerMetadata: + """Tests for MessagingTracker.add_metadata and span kind.""" + + def test_add_metadata(self, memory_exporter): + with track_messaging_operation( + system="kafka", + operation="publish", + destination="events", + ) as tracker: + tracker.add_metadata(partition=3, key="order-123") + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.messaging.partition"] == 3 + assert attrs["botanu.messaging.key"] == "order-123" + + def test_publish_uses_producer_span_kind(self, memory_exporter): + from opentelemetry.trace import SpanKind + + with track_messaging_operation( + system="kafka", + operation="publish", + destination="topic", + ): + pass + + spans = memory_exporter.get_finished_spans() + assert spans[0].kind == SpanKind.PRODUCER + + def test_consume_uses_consumer_span_kind(self, memory_exporter): + from opentelemetry.trace import SpanKind + + with track_messaging_operation( + system="kafka", + operation="consume", + destination="topic", + ): + pass + + spans = memory_exporter.get_finished_spans() + assert spans[0].kind == SpanKind.CONSUMER + + def test_send_uses_producer_span_kind(self, memory_exporter): + from opentelemetry.trace import SpanKind + + with track_messaging_operation( + system="sqs", + operation="send", + destination="queue", + ): + pass + + spans = memory_exporter.get_finished_spans() + assert spans[0].kind == SpanKind.PRODUCER + + def test_duration_finalized(self, memory_exporter): + with track_messaging_operation( + system="sqs", + operation="receive", + destination="q", + ): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert "botanu.messaging.duration_ms" in attrs + + +class TestStandaloneHelpers: + """Tests for set_data_metrics and set_warehouse_metrics.""" + + def test_set_data_metrics(self, memory_exporter): + from opentelemetry import trace as otl_trace + + from botanu.tracking.data import set_data_metrics + + tracer = otl_trace.get_tracer("test") + with tracer.start_as_current_span("test-data-metrics"): + set_data_metrics(rows_returned=100, bytes_read=8192) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.data.rows_returned"] == 100 + assert attrs["botanu.data.bytes_read"] == 8192 + + def test_set_data_metrics_no_active_span(self): + from botanu.tracking.data import set_data_metrics + + # Should not raise when no recording span + set_data_metrics(rows_returned=10) + + def test_set_warehouse_metrics(self, memory_exporter): + from opentelemetry import trace as otl_trace + + from botanu.tracking.data import set_warehouse_metrics + + tracer = otl_trace.get_tracer("test") + with tracer.start_as_current_span("test-warehouse"): + set_warehouse_metrics( + query_id="q-001", + bytes_scanned=10_000_000, + rows_returned=500, + partitions_scanned=12, + ) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.warehouse.query_id"] == "q-001" + assert attrs["botanu.warehouse.bytes_scanned"] == 10_000_000 + assert attrs["botanu.data.rows_returned"] == 500 + assert attrs["botanu.warehouse.partitions_scanned"] == 12 + + def test_set_warehouse_metrics_no_active_span(self): + from botanu.tracking.data import set_warehouse_metrics + + # Should not raise when no recording span + set_warehouse_metrics(query_id="q-002", bytes_scanned=1000) + + +class TestKwargsPassthrough: + """Tests for additional kwargs passed to context managers.""" + + def test_db_operation_kwargs(self, memory_exporter): + with track_db_operation( + system="postgresql", + operation="SELECT", + statement="SELECT 1", + ): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.data.statement"] == "SELECT 1" + + def test_storage_operation_kwargs(self, memory_exporter): + with track_storage_operation( + system="s3", + operation="GET", + bucket="my-bucket", + ): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.storage.bucket"] == "my-bucket" + + def test_messaging_operation_kwargs(self, memory_exporter): + with track_messaging_operation( + system="kafka", + operation="publish", + destination="topic", + partition_key="order-1", + ): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.messaging.partition_key"] == "order-1" diff --git a/tests/unit/test_decorators.py b/tests/unit/test_decorators.py index e7b7dc6..b63f906 100644 --- a/tests/unit/test_decorators.py +++ b/tests/unit/test_decorators.py @@ -6,8 +6,19 @@ from __future__ import annotations import pytest +from opentelemetry import baggage, trace +from opentelemetry import context as otel_context +from opentelemetry.context import get_current -from botanu.sdk.decorators import botanu_use_case +from botanu.sdk.decorators import botanu_outcome, botanu_use_case + + +@pytest.fixture(autouse=True) +def _clean_otel_context(): + """Reset OTel context before each test to avoid baggage leaking between tests.""" + token = otel_context.attach(otel_context.Context()) + yield + otel_context.detach(token) class TestBotanuUseCaseDecorator: @@ -122,3 +133,203 @@ def versioned_function(): assert "botanu.workflow.version" in attrs assert attrs["botanu.workflow.version"].startswith("v:") + + def test_return_value_preserved(self, memory_exporter): + @botanu_use_case("Test") + def returns_dict(): + return {"key": "value", "count": 42} + + result = returns_dict() + assert result == {"key": "value", "count": 42} + + @pytest.mark.asyncio + async def test_async_return_value_preserved(self, memory_exporter): + @botanu_use_case("Test") + async def returns_data(): + return [1, 2, 3] + + result = await returns_data() + assert result == [1, 2, 3] + + def test_exception_re_raised(self, memory_exporter): + @botanu_use_case("Test") + def raises(): + raise TypeError("bad type") + + with pytest.raises(TypeError, match="bad type"): + raises() + + def test_outcome_status_set_on_success(self, memory_exporter): + @botanu_use_case("Test") + def my_fn(): + return "ok" + + my_fn() + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.outcome.status"] == "success" + + def test_outcome_status_set_on_failure(self, memory_exporter): + @botanu_use_case("Test") + def failing(): + raise RuntimeError("boom") + + with pytest.raises(RuntimeError): + failing() + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.outcome.status"] == "failure" + + def test_duration_ms_recorded(self, memory_exporter): + @botanu_use_case("Test") + def quick_fn(): + return "done" + + quick_fn() + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert "botanu.run.duration_ms" in attrs + assert attrs["botanu.run.duration_ms"] >= 0 + + def test_custom_span_kind(self, memory_exporter): + from opentelemetry.trace import SpanKind + + @botanu_use_case("Test", span_kind=SpanKind.CLIENT) + def client_fn(): + return "ok" + + client_fn() + spans = memory_exporter.get_finished_spans() + assert spans[0].kind == SpanKind.CLIENT + + def test_root_run_id_equals_run_id_for_root(self, memory_exporter): + @botanu_use_case("Test") + def root_fn(): + return "root" + + root_fn() + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + # For a root run, root_run_id should equal run_id + assert attrs["botanu.root_run_id"] == attrs["botanu.run_id"] + + def test_tenant_id_propagated(self, memory_exporter): + @botanu_use_case("Test", tenant_id="tenant-abc") + def tenant_fn(): + return "ok" + + tenant_fn() + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.tenant_id"] == "tenant-abc" + + def test_baggage_cleaned_up_after_sync(self, memory_exporter): + """Verify baggage does NOT leak after the decorated function completes.""" + + @botanu_use_case("Leak Test") + def my_fn(): + # Inside the function, baggage should be set + assert baggage.get_baggage("botanu.run_id", get_current()) is not None + return "ok" + + # Before: no baggage + assert baggage.get_baggage("botanu.run_id", get_current()) is None + + my_fn() + + # After: baggage must be cleaned up (detached) + assert baggage.get_baggage("botanu.run_id", get_current()) is None + + @pytest.mark.asyncio + async def test_baggage_cleaned_up_after_async(self, memory_exporter): + """Verify baggage does NOT leak after an async decorated function.""" + + @botanu_use_case("Async Leak Test") + async def my_fn(): + assert baggage.get_baggage("botanu.run_id", get_current()) is not None + return "ok" + + assert baggage.get_baggage("botanu.run_id", get_current()) is None + + await my_fn() + + assert baggage.get_baggage("botanu.run_id", get_current()) is None + + def test_baggage_cleaned_up_after_exception(self, memory_exporter): + """Verify baggage is cleaned up even when the function raises.""" + + @botanu_use_case("Exception Leak Test") + def failing_fn(): + raise RuntimeError("boom") + + assert baggage.get_baggage("botanu.run_id", get_current()) is None + + with pytest.raises(RuntimeError): + failing_fn() + + # Must be cleaned up despite the exception + assert baggage.get_baggage("botanu.run_id", get_current()) is None + + +class TestBotanuOutcomeDecorator: + """Tests for @botanu_outcome decorator.""" + + def test_sync_success_emits_outcome(self, memory_exporter): + tracer_instance = trace.get_tracer("test") + + @botanu_outcome() + def my_fn(): + return "ok" + + with tracer_instance.start_as_current_span("parent"): + result = my_fn() + + assert result == "ok" + + def test_sync_failure_emits_failed(self, memory_exporter): + tracer_instance = trace.get_tracer("test") + + @botanu_outcome() + def failing_fn(): + raise ValueError("broken") + + with tracer_instance.start_as_current_span("parent"): + with pytest.raises(ValueError, match="broken"): + failing_fn() + + @pytest.mark.asyncio + async def test_async_success_emits_outcome(self, memory_exporter): + tracer_instance = trace.get_tracer("test") + + @botanu_outcome() + async def async_fn(): + return "async ok" + + with tracer_instance.start_as_current_span("parent"): + result = await async_fn() + + assert result == "async ok" + + @pytest.mark.asyncio + async def test_async_failure_emits_failed(self, memory_exporter): + tracer_instance = trace.get_tracer("test") + + @botanu_outcome() + async def async_fail(): + raise RuntimeError("async boom") + + with tracer_instance.start_as_current_span("parent"): + with pytest.raises(RuntimeError, match="async boom"): + await async_fail() + + def test_exception_re_raised(self, memory_exporter): + tracer_instance = trace.get_tracer("test") + + @botanu_outcome() + def raises(): + raise TypeError("type err") + + with tracer_instance.start_as_current_span("parent"): + with pytest.raises(TypeError, match="type err"): + raises() diff --git a/tests/unit/test_ledger.py b/tests/unit/test_ledger.py index c4ea3e3..9c492b2 100644 --- a/tests/unit/test_ledger.py +++ b/tests/unit/test_ledger.py @@ -275,3 +275,221 @@ def test_record_tool_attempted(self): record_tool_attempted(run_id="run-123", tool_name="search") mock_ledger.tool_attempted.assert_called_once_with(run_id="run-123", tool_name="search") + + +class TestAttemptLedgerEmitMocked: + """Tests for ledger methods with mocked _emit to verify event attributes.""" + + def _make_ledger(self): + ledger = AttemptLedger.__new__(AttemptLedger) + ledger._initialized = True + ledger._logger = mock.MagicMock() + ledger.service_name = "test-svc" + return ledger + + def test_attempt_started_attributes(self): + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.attempt_started( + run_id="run-100", + use_case="billing", + attempt=2, + root_run_id="root-50", + workflow="invoice", + tenant_id="t-001", + deadline_ts=1700000000.0, + ) + + ledger._emit.assert_called_once() + event_type, _severity, attrs = ledger._emit.call_args[0] + assert event_type == LedgerEventType.ATTEMPT_STARTED + assert attrs["botanu.run_id"] == "run-100" + assert attrs["botanu.use_case"] == "billing" + assert attrs["botanu.attempt"] == 2 + assert attrs["botanu.root_run_id"] == "root-50" + assert attrs["botanu.workflow"] == "invoice" + assert attrs["botanu.tenant_id"] == "t-001" + assert attrs["botanu.deadline_ts"] == 1700000000.0 + + def test_attempt_ended_success(self): + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.attempt_ended( + run_id="run-200", + status="success", + duration_ms=1500.0, + ) + + _, _severity, attrs = ledger._emit.call_args[0] + assert attrs["botanu.run_id"] == "run-200" + assert attrs["status"] == "success" + assert attrs["duration_ms"] == 1500.0 + + def test_attempt_ended_error(self): + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.attempt_ended( + run_id="run-201", + status="error", + error_class="ValueError", + reason_code="INVALID_INPUT", + ) + + _, _severity, attrs = ledger._emit.call_args[0] + assert attrs["status"] == "error" + assert attrs["error_class"] == "ValueError" + assert attrs["reason_code"] == "INVALID_INPUT" + + def test_llm_attempted_full_attributes(self): + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.llm_attempted( + run_id="run-300", + provider="openai", + model="gpt-4", + operation="chat", + attempt_number=1, + input_tokens=500, + output_tokens=200, + cached_tokens=100, + duration_ms=800.0, + status="success", + provider_request_id="resp-abc", + estimated_cost_usd=0.0075, + ) + + _, _, attrs = ledger._emit.call_args[0] + assert attrs["gen_ai.provider.name"] == "openai" + assert attrs["gen_ai.request.model"] == "gpt-4" + assert attrs["gen_ai.usage.input_tokens"] == 500 + assert attrs["gen_ai.usage.output_tokens"] == 200 + assert attrs["botanu.usage.cached_tokens"] == 100 + assert attrs["botanu.cost.estimated_usd"] == 0.0075 + + def test_tool_attempted_attributes(self): + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.tool_attempted( + run_id="run-400", + tool_name="search", + tool_call_id="call-xyz", + duration_ms=250.0, + items_returned=3, + bytes_processed=4096, + ) + + _, _, attrs = ledger._emit.call_args[0] + assert attrs["gen_ai.tool.name"] == "search" + assert attrs["gen_ai.tool.call.id"] == "call-xyz" + assert attrs["items_returned"] == 3 + assert attrs["bytes_processed"] == 4096 + + def test_cancel_requested_attributes(self): + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.cancel_requested( + run_id="run-500", + reason="timeout", + requested_at_ms=1700000001000.0, + ) + + event_type, _, attrs = ledger._emit.call_args[0] + assert event_type == LedgerEventType.CANCEL_REQUESTED + assert attrs["cancellation.reason"] == "timeout" + assert attrs["cancellation.requested_at_ms"] == 1700000001000.0 + + def test_cancel_acknowledged_attributes(self): + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.cancel_acknowledged( + run_id="run-600", + acknowledged_by="worker-3", + latency_ms=150.0, + ) + + event_type, _, attrs = ledger._emit.call_args[0] + assert event_type == LedgerEventType.CANCEL_ACKNOWLEDGED + assert attrs["cancellation.acknowledged_by"] == "worker-3" + assert attrs["cancellation.latency_ms"] == 150.0 + + def test_zombie_detected_attributes(self): + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.zombie_detected( + run_id="run-700", + deadline_ts=1000.0, + actual_end_ts=5000.0, + zombie_duration_ms=4000.0, + component="agent_loop", + ) + + event_type, _, attrs = ledger._emit.call_args[0] + assert event_type == LedgerEventType.ZOMBIE_DETECTED + assert attrs["zombie_duration_ms"] == 4000.0 + assert attrs["zombie_component"] == "agent_loop" + + def test_redelivery_detected_attributes(self): + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.redelivery_detected( + run_id="run-800", + queue_name="tasks-queue", + delivery_count=3, + original_message_id="msg-original", + ) + + event_type, _, attrs = ledger._emit.call_args[0] + assert event_type == LedgerEventType.REDELIVERY_DETECTED + assert attrs["queue.name"] == "tasks-queue" + assert attrs["delivery_count"] == 3 + assert attrs["original_message_id"] == "msg-original" + + def test_attempt_started_default_root_run_id(self): + """root_run_id defaults to run_id when not provided.""" + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.attempt_started(run_id="run-solo", use_case="test") + + _, _, attrs = ledger._emit.call_args[0] + assert attrs["botanu.root_run_id"] == "run-solo" + + def test_cancel_requested_auto_timestamp(self): + """requested_at_ms uses current time when not provided.""" + ledger = self._make_ledger() + ledger._emit = mock.MagicMock() + + ledger.cancel_requested(run_id="run-ts", reason="user") + + _, _, attrs = ledger._emit.call_args[0] + assert attrs["cancellation.requested_at_ms"] > 0 + + +class TestLedgerGlobalReset: + """Tests for global ledger cleanup.""" + + def test_set_ledger_overrides_default(self): + import botanu.tracking.ledger as ledger_module + + ledger_module._global_ledger = None + default = get_ledger() + + custom = AttemptLedger.__new__(AttemptLedger) + custom._initialized = False + custom.service_name = "override" + set_ledger(custom) + + assert get_ledger() is custom + assert get_ledger() is not default + + # Cleanup + ledger_module._global_ledger = None diff --git a/tests/unit/test_llm_tracking.py b/tests/unit/test_llm_tracking.py index c9b7b58..dd09cf9 100644 --- a/tests/unit/test_llm_tracking.py +++ b/tests/unit/test_llm_tracking.py @@ -305,3 +305,233 @@ def test_botanu_attributes(self): assert BotanuAttributes.CACHE_HIT == "botanu.request.cache_hit" assert BotanuAttributes.ATTEMPT_NUMBER == "botanu.request.attempt" assert BotanuAttributes.VENDOR == "botanu.vendor" + + +class TestTrackToolCall: + """Tests for track_tool_call context manager.""" + + def test_creates_span(self, memory_exporter): + from botanu.tracking.llm import track_tool_call + + with track_tool_call(tool_name="search"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "execute_tool search" + + def test_tool_call_attributes(self, memory_exporter): + from botanu.tracking.llm import track_tool_call + + with track_tool_call( + tool_name="web_search", + tool_call_id="call_abc123", + provider="tavily", + ): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.TOOL_NAME] == "web_search" + assert attrs[GenAIAttributes.TOOL_CALL_ID] == "call_abc123" + assert attrs[GenAIAttributes.OPERATION_NAME] == "execute_tool" + + def test_tool_tracker_set_result(self, memory_exporter): + from botanu.tracking.llm import BotanuAttributes, track_tool_call + + with track_tool_call(tool_name="db_query") as tracker: + tracker.set_result(success=True, items_returned=42, bytes_processed=8192) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[BotanuAttributes.TOOL_SUCCESS] is True + assert attrs[BotanuAttributes.TOOL_ITEMS_RETURNED] == 42 + assert attrs[BotanuAttributes.TOOL_BYTES_PROCESSED] == 8192 + + def test_tool_tracker_set_error(self, memory_exporter): + from botanu.tracking.llm import track_tool_call + + with pytest.raises(ConnectionError): + with track_tool_call(tool_name="api_call"): + raise ConnectionError("Service down") + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.ERROR_TYPE] == "ConnectionError" + + def test_tool_tracker_set_tool_call_id(self, memory_exporter): + from botanu.tracking.llm import track_tool_call + + with track_tool_call(tool_name="calc") as tracker: + tracker.set_tool_call_id("call_xyz789") + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.TOOL_CALL_ID] == "call_xyz789" + + def test_tool_tracker_add_metadata(self, memory_exporter): + from botanu.tracking.llm import track_tool_call + + with track_tool_call(tool_name="search") as tracker: + tracker.add_metadata(query="python otel", source="web") + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.tool.query"] == "python otel" + assert attrs["botanu.tool.source"] == "web" + + def test_tool_duration_recorded(self, memory_exporter): + from botanu.tracking.llm import BotanuAttributes, track_tool_call + + with track_tool_call(tool_name="slow_tool"): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert BotanuAttributes.TOOL_DURATION_MS in attrs + assert attrs[BotanuAttributes.TOOL_DURATION_MS] >= 0 + + +class TestStandaloneHelpers: + """Tests for set_llm_attributes and set_token_usage.""" + + def test_set_llm_attributes(self, memory_exporter): + from opentelemetry import trace as otl_trace + + from botanu.tracking.llm import BotanuAttributes, set_llm_attributes + + tracer = otl_trace.get_tracer("test") + with tracer.start_as_current_span("test-llm-attrs"): + set_llm_attributes( + provider="openai", + model="gpt-4", + input_tokens=150, + output_tokens=75, + streaming=True, + provider_request_id="resp_abc", + ) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.PROVIDER_NAME] == "openai" + assert attrs[GenAIAttributes.REQUEST_MODEL] == "gpt-4" + assert attrs[GenAIAttributes.USAGE_INPUT_TOKENS] == 150 + assert attrs[GenAIAttributes.USAGE_OUTPUT_TOKENS] == 75 + assert attrs[BotanuAttributes.STREAMING] is True + assert attrs[GenAIAttributes.RESPONSE_ID] == "resp_abc" + + def test_set_llm_attributes_no_active_span(self): + from botanu.tracking.llm import set_llm_attributes + + # Should not raise when no recording span + set_llm_attributes(provider="openai", model="gpt-4") + + def test_set_token_usage(self, memory_exporter): + from opentelemetry import trace as otl_trace + + from botanu.tracking.llm import BotanuAttributes, set_token_usage + + tracer = otl_trace.get_tracer("test") + with tracer.start_as_current_span("test-token-usage"): + set_token_usage(input_tokens=200, output_tokens=100, cached_tokens=50) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.USAGE_INPUT_TOKENS] == 200 + assert attrs[GenAIAttributes.USAGE_OUTPUT_TOKENS] == 100 + assert attrs[BotanuAttributes.TOKENS_CACHED] == 50 + + def test_set_token_usage_no_active_span(self): + from botanu.tracking.llm import set_token_usage + + # Should not raise when no recording span + set_token_usage(input_tokens=10, output_tokens=5) + + +class TestLLMInstrumentedDecorator: + """Tests for the llm_instrumented decorator.""" + + def test_decorator_creates_span(self, memory_exporter): + from botanu.tracking.llm import llm_instrumented + + @llm_instrumented(provider="openai") + def fake_completion(prompt, model="gpt-4"): + class _Usage: + prompt_tokens = 10 + completion_tokens = 20 + + class _Response: + usage = _Usage() + + return _Response() + + result = fake_completion("Hello", model="gpt-4") + assert result is not None + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.PROVIDER_NAME] == "openai" + assert attrs[GenAIAttributes.REQUEST_MODEL] == "gpt-4" + assert attrs[GenAIAttributes.USAGE_INPUT_TOKENS] == 10 + assert attrs[GenAIAttributes.USAGE_OUTPUT_TOKENS] == 20 + + def test_decorator_with_streaming(self, memory_exporter): + from botanu.tracking.llm import BotanuAttributes, llm_instrumented + + @llm_instrumented(provider="anthropic") + def fake_stream(prompt, model="claude-3", stream=False): + return "streamed" + + fake_stream("Hi", model="claude-3", stream=True) + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[BotanuAttributes.STREAMING] is True + + def test_decorator_without_usage(self, memory_exporter): + from botanu.tracking.llm import llm_instrumented + + @llm_instrumented(provider="custom", tokens_from_response=False) + def no_usage_fn(prompt, model="custom-model"): + return "done" + + no_usage_fn("test", model="custom-model") + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert GenAIAttributes.USAGE_INPUT_TOKENS not in attrs + + +class TestClientRequestId: + """Tests for client_request_id passthrough.""" + + def test_client_request_id_on_track_llm_call(self, memory_exporter): + from botanu.tracking.llm import BotanuAttributes + + with track_llm_call( + model="gpt-4", + provider="openai", + client_request_id="cli-req-001", + ): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[BotanuAttributes.CLIENT_REQUEST_ID] == "cli-req-001" + + +class TestKwargsPassthrough: + """Tests for additional kwargs passed to track_llm_call.""" + + def test_custom_kwargs(self, memory_exporter): + with track_llm_call( + model="gpt-4", + provider="openai", + deployment_id="dep-001", + ): + pass + + spans = memory_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["botanu.deployment_id"] == "dep-001" diff --git a/tests/unit/test_middleware.py b/tests/unit/test_middleware.py new file mode 100644 index 0000000..b41b838 --- /dev/null +++ b/tests/unit/test_middleware.py @@ -0,0 +1,175 @@ +# SPDX-FileCopyrightText: 2026 The Botanu Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for BotanuMiddleware (FastAPI/Starlette).""" + +from __future__ import annotations + +import pytest +from opentelemetry import context as otel_context +from starlette.applications import Starlette +from starlette.responses import JSONResponse +from starlette.routing import Route +from starlette.testclient import TestClient + +from botanu.sdk.middleware import BotanuMiddleware + + +def _make_app(*, use_case: str = "test_uc", workflow: str | None = None, auto_generate_run_id: bool = True): + """Build a minimal Starlette app with BotanuMiddleware.""" + + async def homepage(request): + return JSONResponse({"ok": True}) + + app = Starlette(routes=[Route("/", homepage)]) + app.add_middleware( + BotanuMiddleware, + use_case=use_case, + workflow=workflow, + auto_generate_run_id=auto_generate_run_id, + ) + return app + + +@pytest.fixture(autouse=True) +def _clean_otel_context(): + """Reset OTel context before each middleware test to avoid baggage leaking.""" + token = otel_context.attach(otel_context.Context()) + yield + otel_context.detach(token) + + +class TestBotanuMiddleware: + """Tests for BotanuMiddleware dispatch behaviour.""" + + def test_response_contains_use_case_header(self, memory_exporter): + client = TestClient(_make_app(use_case="billing")) + resp = client.get("/") + assert resp.status_code == 200 + assert resp.headers["x-botanu-use-case"] == "billing" + + def test_response_contains_workflow_header(self, memory_exporter): + client = TestClient(_make_app(use_case="billing", workflow="invoice_flow")) + resp = client.get("/") + assert resp.headers["x-botanu-workflow"] == "invoice_flow" + + def test_auto_generated_run_id_in_response(self, memory_exporter): + client = TestClient(_make_app()) + resp = client.get("/") + run_id = resp.headers.get("x-botanu-run-id") + assert run_id is not None + assert len(run_id) > 0 + + def test_run_id_propagated_from_header(self, memory_exporter): + client = TestClient(_make_app()) + resp = client.get("/", headers={"x-botanu-run-id": "my-custom-run-123"}) + assert resp.headers["x-botanu-run-id"] == "my-custom-run-123" + + def test_use_case_propagated_from_header(self, memory_exporter): + client = TestClient(_make_app(use_case="default_uc")) + resp = client.get("/", headers={"x-botanu-use-case": "overridden_uc"}) + assert resp.headers["x-botanu-use-case"] == "overridden_uc" + + def test_workflow_propagated_from_header(self, memory_exporter): + client = TestClient(_make_app(use_case="uc", workflow="default_wf")) + resp = client.get("/", headers={"x-botanu-workflow": "overridden_wf"}) + assert resp.headers["x-botanu-workflow"] == "overridden_wf" + + def test_no_auto_run_id_when_disabled(self, memory_exporter): + client = TestClient(_make_app(auto_generate_run_id=False)) + resp = client.get("/") + # Should not have a run_id header since none was provided and auto-gen is off + assert "x-botanu-run-id" not in resp.headers + + def test_workflow_defaults_to_use_case(self, memory_exporter): + client = TestClient(_make_app(use_case="my_uc")) + resp = client.get("/") + assert resp.headers["x-botanu-workflow"] == "my_uc" + + def test_customer_id_propagated_from_header(self, memory_exporter): + client = TestClient(_make_app()) + resp = client.get("/", headers={"x-botanu-customer-id": "cust-456"}) + assert resp.status_code == 200 + + def test_each_request_gets_unique_run_id(self, memory_exporter): + client = TestClient(_make_app()) + resp1 = client.get("/") + resp2 = client.get("/") + run_id1 = resp1.headers.get("x-botanu-run-id") + run_id2 = resp2.headers.get("x-botanu-run-id") + assert run_id1 != run_id2 + + +class TestMiddlewareBaggageIsolation: + """Tests for baggage context isolation between requests.""" + + def test_baggage_does_not_leak_between_requests(self, memory_exporter): + """Baggage set in request 1 must not appear in request 2.""" + + app_with_baggage_check = _make_baggage_check_app() + client = TestClient(app_with_baggage_check) + + # Request 1: sends a custom run_id + resp1 = client.get("/check", headers={"x-botanu-run-id": "leak-test-001"}) + resp1.json() + + # Request 2: no custom run_id + resp2 = client.get("/check") + data2 = resp2.json() + + # Request 2 should NOT see request 1's run_id in baggage + assert data2.get("run_id") != "leak-test-001" + + def test_header_priority_over_constructor_defaults(self, memory_exporter): + """x-botanu-use-case header should override constructor default.""" + client = TestClient(_make_app(use_case="default_uc")) + resp = client.get("/", headers={"x-botanu-use-case": "header_uc"}) + assert resp.headers["x-botanu-use-case"] == "header_uc" + + def test_multiple_headers_propagated(self, memory_exporter): + """All x-botanu-* headers should be propagated together.""" + client = TestClient(_make_app(use_case="uc")) + resp = client.get( + "/", + headers={ + "x-botanu-run-id": "multi-001", + "x-botanu-use-case": "multi-uc", + "x-botanu-workflow": "multi-wf", + "x-botanu-customer-id": "cust-multi", + }, + ) + assert resp.headers["x-botanu-run-id"] == "multi-001" + assert resp.headers["x-botanu-use-case"] == "multi-uc" + assert resp.headers["x-botanu-workflow"] == "multi-wf" + + def test_exception_in_handler_still_detaches_context(self, memory_exporter): + """Context token should be detached even when handler raises.""" + app = _make_error_app() + client = TestClient(app, raise_server_exceptions=False) + resp = client.get("/error") + assert resp.status_code == 500 + + +def _make_baggage_check_app(): + """Build app that returns current baggage values.""" + from opentelemetry import baggage as otel_baggage + from opentelemetry.context import get_current + + async def check_baggage(request): + run_id = otel_baggage.get_baggage("botanu.run_id", context=get_current()) + return JSONResponse({"run_id": run_id}) + + app = Starlette(routes=[Route("/check", check_baggage)]) + app.add_middleware(BotanuMiddleware, use_case="test") + return app + + +def _make_error_app(): + """Build app that raises an exception in the handler.""" + + async def error_handler(request): + raise RuntimeError("Intentional test error") + + app = Starlette(routes=[Route("/error", error_handler)]) + app.add_middleware(BotanuMiddleware, use_case="error_test") + return app diff --git a/tests/unit/test_resource_detector.py b/tests/unit/test_resource_detector.py index 7ec32b8..dad4d3d 100644 --- a/tests/unit/test_resource_detector.py +++ b/tests/unit/test_resource_detector.py @@ -267,3 +267,189 @@ def test_include_process_only(self): ) assert "process.pid" in attrs assert "host.name" not in attrs + + +class TestAWSAvailabilityZone: + """Tests for _get_aws_availability_zone.""" + + def test_returns_none_for_lambda(self): + from botanu.resources.detector import _get_aws_availability_zone + + with mock.patch.dict(os.environ, {"AWS_LAMBDA_FUNCTION_NAME": "fn"}): + assert _get_aws_availability_zone() is None + + def test_returns_none_when_metadata_disabled(self): + from botanu.resources.detector import _get_aws_availability_zone + + with mock.patch.dict(os.environ, {"AWS_EC2_METADATA_DISABLED": "true"}, clear=True): + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + assert _get_aws_availability_zone() is None + + def test_returns_none_when_invalid_endpoint(self): + from botanu.resources.detector import _get_aws_availability_zone + + with mock.patch.dict( + os.environ, + { + "AWS_EC2_METADATA_SERVICE_ENDPOINT": "not-a-url", + }, + clear=True, + ): + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + assert _get_aws_availability_zone() is None + + def test_returns_none_on_network_error(self): + from botanu.resources.detector import _get_aws_availability_zone + + with mock.patch.dict(os.environ, {}, clear=True): + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + os.environ.pop("AWS_EC2_METADATA_DISABLED", None) + # Default endpoint (169.254.169.254) will fail in test env + result = _get_aws_availability_zone() + assert result is None + + +class TestCloudRegionFromAZ: + """Tests for cloud region derivation from availability zone.""" + + def test_region_derived_from_az(self): + """When AZ is 'us-east-1a', region should be 'us-east-1'.""" + + with mock.patch.dict( + os.environ, + { + "AWS_REGION": "", + "AWS_DEFAULT_REGION": "", + "AWS_ACCOUNT_ID": "123456789012", + }, + clear=True, + ): + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + + # Mock the IMDS call to return an AZ + with mock.patch( + "botanu.resources.detector._get_aws_availability_zone", + return_value="us-west-2c", + ): + attrs = detect_cloud_provider() + if "cloud.availability_zone" in attrs: + assert attrs["cloud.region"] == "us-west-2" + + +class TestContainerId: + """Tests for container ID extraction.""" + + def test_container_id_from_env(self): + from botanu.resources.detector import _get_container_id + + # Short container IDs (< 12 chars) are ignored + with mock.patch.dict(os.environ, {"CONTAINER_ID": "short"}, clear=True): + os.environ.pop("HOSTNAME", None) + result = _get_container_id() + assert result is None + + # Long enough IDs are returned + with mock.patch.dict(os.environ, {"CONTAINER_ID": "abcdef123456"}, clear=True): + os.environ.pop("HOSTNAME", None) + result = _get_container_id() + # May be overridden by cgroup parsing, but at minimum not None + assert result is None or len(result) >= 12 + + +class TestDetectHostExtended: + """Extended host detection tests.""" + + def test_host_id_from_env(self): + with mock.patch.dict(os.environ, {"HOST_ID": "i-0123456789"}): + attrs = detect_host() + assert attrs["host.id"] == "i-0123456789" + + def test_host_id_from_instance_id(self): + with mock.patch.dict(os.environ, {"INSTANCE_ID": "vm-abc"}, clear=True): + os.environ.pop("HOST_ID", None) + attrs = detect_host() + assert attrs["host.id"] == "vm-abc" + + def test_host_id_falls_back_to_hostname(self): + with mock.patch.dict(os.environ, {}, clear=True): + os.environ.pop("HOST_ID", None) + os.environ.pop("INSTANCE_ID", None) + attrs = detect_host() + assert attrs.get("host.id") == attrs.get("host.name") + + +class TestDetectServerlessExtended: + """Extended serverless detection tests.""" + + def test_gcp_cloud_function(self): + with mock.patch.dict( + os.environ, + { + "FUNCTION_NAME": "my-function", + "FUNCTION_TARGET": "handle_event", + }, + clear=True, + ): + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + os.environ.pop("K_SERVICE", None) + attrs = detect_serverless() + assert attrs["faas.name"] == "my-function" + assert attrs["faas.trigger"] == "handle_event" + + def test_azure_functions(self): + with mock.patch.dict( + os.environ, + { + "WEBSITE_SITE_NAME": "my-azure-fn", + "WEBSITE_INSTANCE_ID": "inst-123", + }, + clear=True, + ): + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + os.environ.pop("K_SERVICE", None) + os.environ.pop("FUNCTION_NAME", None) + attrs = detect_serverless() + assert attrs["faas.name"] == "my-azure-fn" + assert attrs["faas.instance"] == "inst-123" + + def test_no_serverless_detected(self): + with mock.patch.dict(os.environ, {}, clear=True): + os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + os.environ.pop("K_SERVICE", None) + os.environ.pop("FUNCTION_NAME", None) + os.environ.pop("WEBSITE_SITE_NAME", None) + attrs = detect_serverless() + assert attrs == {} + + +class TestDetectProcessExtended: + """Extended process detection tests.""" + + def test_process_command(self): + attrs = detect_process() + assert "process.command" in attrs + assert isinstance(attrs["process.command"], str) + + def test_process_runtime_version_format(self): + attrs = detect_process() + version = attrs["process.runtime.version"] + parts = version.split(".") + assert len(parts) >= 2 # major.minor at minimum + + +class TestServiceInstanceId: + """Tests for service.instance.id derivation in detect_all_resources.""" + + def test_instance_id_from_hostname_in_k8s(self): + detect_all_resources.cache_clear() + with mock.patch.dict( + os.environ, + { + "KUBERNETES_SERVICE_HOST": "10.0.0.1", + "HOSTNAME": "my-pod-abc123xyz", + }, + ): + attrs = detect_all_resources() + # Should have service.instance.id + assert "service.instance.id" in attrs + detect_all_resources.cache_clear()