-
Notifications
You must be signed in to change notification settings - Fork 18
feat: add otel plugin (#371) #417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| """Demonstrates handler execution without any durable operations.""" | ||
|
|
||
| from typing import Any | ||
|
|
||
| from opentelemetry import trace | ||
|
|
||
| from aws_durable_execution_sdk_python import StepContext | ||
| from aws_durable_execution_sdk_python.config import Duration, StepConfig, StepSemantics | ||
| from aws_durable_execution_sdk_python.context import ( | ||
| DurableContext, | ||
| durable_step, | ||
| durable_with_child_context, | ||
| ) | ||
| from aws_durable_execution_sdk_python.execution import durable_execution | ||
|
|
||
| from aws_durable_execution_sdk_python_otel import DurableExecutionOtelPlugin | ||
|
|
||
|
|
||
| # use default provider | ||
| tracer_provider = trace.get_tracer_provider() | ||
| otel = DurableExecutionOtelPlugin(tracer_provider) | ||
|
|
||
|
|
||
| @durable_step | ||
| def add_numbers(_step_context: StepContext, a: int, b: int) -> int: | ||
| return a + b | ||
|
|
||
|
|
||
| @durable_with_child_context | ||
| def add_numbers_in_child(child_context: DurableContext, a: int, b: int): | ||
| result: int = child_context.step( | ||
| add_numbers(a, b), | ||
| name=f"step-{b}", | ||
| ) | ||
| child_context.wait( | ||
| Duration.from_seconds(1), | ||
| name=f"wait-{b}", | ||
| ) | ||
| return result | ||
|
|
||
|
|
||
| @durable_execution(plugins=[otel]) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for follow up in another CR, what are the other setup steps we need for this plugin? |
||
| def handler(_event: Any, context: DurableContext) -> int: | ||
| result = 0 | ||
| for i in range(3): | ||
| result += context.run_in_child_context( | ||
| add_numbers_in_child(6, i), | ||
| name=f"context-{i}", | ||
| ) | ||
| return context.step( | ||
| add_numbers(result, 2), | ||
| name="final-step", | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| """Tests for step example.""" | ||
|
|
||
| import pytest | ||
| from aws_durable_execution_sdk_python.execution import InvocationStatus | ||
|
|
||
| from src.plugin import execution_with_otel | ||
| from test.conftest import deserialize_operation_payload | ||
|
|
||
|
|
||
| @pytest.mark.example | ||
| @pytest.mark.durable_execution( | ||
| handler=execution_with_otel.handler, | ||
| lambda_function_name="Otel Plugin", | ||
| ) | ||
| def test_plugin(durable_runner): | ||
| """Test basic step example.""" | ||
| with durable_runner: | ||
| result = durable_runner.run(input="{}", timeout=10) | ||
|
|
||
| assert result.status is InvocationStatus.SUCCEEDED | ||
| assert deserialize_operation_payload(result.result) == 23 | ||
|
|
||
| step_result = result.get_step("final-step") | ||
| assert deserialize_operation_payload(step_result.result) == 23 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,8 @@ dependencies = [ | |
| "aws-durable-execution-sdk-python>=1.5.0", | ||
| "opentelemetry-api>=1.20.0", | ||
| "opentelemetry-sdk>=1.20.0", | ||
| "opentelemetry-exporter-otlp", | ||
| "opentelemetry-propagator-aws-xray", | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if these dependencies are required. If not, we can add them to the dependencies of example package |
||
| ] | ||
|
|
||
| [project.urls] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,24 @@ | ||
| """OpenTelemetry instrumentation for AWS Lambda Durable Executions Python SDK.""" | ||
|
|
||
| from aws_durable_execution_sdk_python_otel.__about__ import __version__ | ||
| from aws_durable_execution_sdk_python_otel.context_extractors import ( | ||
| ContextExtractor, | ||
| w3c_client_context_extractor, | ||
| xray_context_extractor, | ||
| ) | ||
| from aws_durable_execution_sdk_python_otel.deterministic_id_generator import ( | ||
| DeterministicIdGenerator, | ||
| ) | ||
| from aws_durable_execution_sdk_python_otel.plugin import ( | ||
| DurableExecutionOtelPlugin, | ||
| ) | ||
|
|
||
|
|
||
| __all__ = [ | ||
| "__version__", | ||
| "ContextExtractor", | ||
| "DeterministicIdGenerator", | ||
| "DurableExecutionOtelPlugin", | ||
| "w3c_client_context_extractor", | ||
| "xray_context_extractor", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| """Context extractors for propagating trace context into durable executions.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import os | ||
| from typing import TYPE_CHECKING, Callable | ||
|
|
||
| from opentelemetry import context as otel_context, propagate | ||
|
|
||
|
|
||
| if TYPE_CHECKING: | ||
| from opentelemetry.context import Context | ||
|
|
||
| from aws_durable_execution_sdk_python.plugin import InvocationStartInfo | ||
|
|
||
| ContextExtractor = Callable[["InvocationStartInfo"], "Context"] | ||
|
|
||
|
|
||
| def xray_context_extractor(info: "InvocationStartInfo") -> "Context": | ||
| """Read the X-Ray trace header from the _X_AMZN_TRACE_ID environment variable. | ||
| The durable execution backend propagates the same Root trace ID to every | ||
| invocation, so all invocations share one traceId. | ||
| """ | ||
| trace_header = os.environ.get("_X_AMZN_TRACE_ID") | ||
| if not trace_header: | ||
| return otel_context.get_current() | ||
| return propagate.extract( | ||
| carrier={"X-Amzn-Trace-Id": trace_header}, | ||
| context=otel_context.get_current(), | ||
| ) | ||
|
|
||
|
|
||
| def w3c_client_context_extractor(info: "InvocationStartInfo") -> "Context": | ||
| """Read W3C traceparent from context.clientContext.custom.traceparent. | ||
| Requires the backend clientContext propagation to be enabled. | ||
| This extractor is a placeholder for when backend propagation is supported. | ||
| """ | ||
| return otel_context.get_current() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| """Deterministic ID generator for OpenTelemetry spans in durable executions.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import hashlib | ||
| import os | ||
| import re | ||
| from datetime import datetime, UTC | ||
|
|
||
| from opentelemetry.sdk.trace import IdGenerator, RandomIdGenerator | ||
|
|
||
| HASH_LENGTH = 16 | ||
| HASHED_ID_PATTERN = re.compile(r"^[0-9a-f]{16}$") | ||
|
|
||
|
|
||
| def _parse_xray_root_trace_id(trace_header: str | None) -> str | None: | ||
| """Parse the Root trace ID from an X-Ray trace header string. | ||
|
|
||
| The header format is: | ||
| Root=1-<8 hex>-<24 hex>;Parent=<16 hex>;Sampled=0|1 | ||
|
|
||
| Returns the root value (e.g. "1-5759e988-bd862e3fe1be46a994272793") | ||
| or None if the header is missing or malformed. | ||
| """ | ||
| if not trace_header: | ||
| return None | ||
| match = re.search(r"Root=(1-[0-9a-fA-F]{8}-[0-9a-fA-F]{24})", trace_header) | ||
| return match.group(1) if match else None | ||
|
|
||
|
|
||
| def _xray_trace_id_to_otel(xray_trace_id: str) -> int: | ||
| """Convert an X-Ray trace ID to the W3C/OpenTelemetry 32-char hex format. | ||
|
|
||
| X-Ray format: "1-<8hex>-<24hex>" (36 chars with prefix and dashes) | ||
| OTel format: "<8hex><24hex>" (32 lowercase hex chars) | ||
| """ | ||
| otel_id = xray_trace_id.replace("1-", "", 1).replace("-", "").lower() | ||
| return int(otel_id, 16) | ||
|
|
||
|
|
||
| def _to_otel_trace_id(execution_arn: str, start_timestamp: datetime | None) -> int: | ||
| """Build an OTel-compatible trace ID (128 bits) | ||
|
|
||
| First attempts to read the trace ID from the _X_AMZN_TRACE_ID environment | ||
| variable that Lambda populates on each invocation. This ties the durable | ||
| execution spans to the same trace that X-Ray is already tracking. | ||
|
|
||
| Falls back to generating a deterministic trace ID from the execution ARN | ||
| and timestamp when the environment variable is not set (e.g. in tests or | ||
| non-Lambda environments). | ||
| """ | ||
| env_trace_id = _parse_xray_root_trace_id(os.environ.get("_X_AMZN_TRACE_ID")) | ||
| if env_trace_id: | ||
| return _xray_trace_id_to_otel(env_trace_id) | ||
|
|
||
| # Fallback: deterministic ID from execution ARN + timestamp | ||
| time_part = format(int((start_timestamp or datetime.now(UTC)).timestamp()), "08x") | ||
| hash_part = hashlib.blake2b(execution_arn.encode()).hexdigest()[:24] # noqa: S324 | ||
| return int(f"{time_part}{hash_part}", 16) | ||
|
|
||
|
|
||
| def operation_id_to_span_id(operation_id: str) -> int: | ||
| """Derive a deterministic span ID (64 bits) from an operation ID.""" | ||
| hashed_operation_id = hashlib.blake2b(operation_id.encode()).hexdigest()[:16] | ||
| return int(hashed_operation_id, 16) | ||
|
|
||
|
|
||
| class DeterministicIdGenerator(IdGenerator): | ||
| """An ID generator that produces deterministic span IDs when a pending | ||
| operation ID is set, and random IDs otherwise. | ||
|
|
||
| Trace IDs are deterministic when an execution ARN is set, ensuring all | ||
| invocations of the same durable execution share a single trace. | ||
|
|
||
| Trace IDs embed a real timestamp so they satisfy the X-Ray format | ||
| requirement (first 8 hex chars = Unix epoch seconds). | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| self._next_span_id: int | None = None | ||
| self._execution_trace_id: int | None = None | ||
| self._random_id_generator = RandomIdGenerator() | ||
|
|
||
| def set_next_span_id(self, span_id: int | None) -> None: | ||
| """Set the operation ID to use for the next span's ID. | ||
|
|
||
| After one span is created, it resets to random. | ||
| """ | ||
| self._next_span_id = span_id | ||
|
|
||
| def set_trace_id( | ||
| self, execution_arn: str, start_timestamp: datetime | None | ||
| ) -> None: | ||
| """Compute and cache the deterministic trace ID for this execution. | ||
|
|
||
| Args: | ||
| execution_arn: The durable execution ARN (used for the hash portion). | ||
| start_timestamp: start time of invocation | ||
| """ | ||
| self._execution_trace_id = _to_otel_trace_id(execution_arn, start_timestamp) | ||
|
|
||
| def generate_trace_id(self) -> int: | ||
| """Generate a 128-bit trace ID.""" | ||
| return self._execution_trace_id or self._random_id_generator.generate_trace_id() | ||
|
|
||
| def generate_span_id(self) -> int: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method could be called concurrently so set then generate pattern may not work well in a multithreading env |
||
| """Generate a 64-bit span ID.""" | ||
| span_id, self._next_span_id = self._next_span_id, None | ||
| return span_id or self._random_id_generator.generate_span_id() | ||
Uh oh!
There was an error while loading. Please reload this page.