Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions packages/aws-durable-execution-sdk-python-examples/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import shutil
import subprocess
import sys
import time
import zipfile
Expand Down Expand Up @@ -35,6 +36,7 @@ def build_examples():

build_dir = Path(__file__).parent / "build"
src_dir = Path(__file__).parent / "src"
packages_dir = Path(__file__).parent.parent

logger.info("Building examples...")

Expand All @@ -57,15 +59,29 @@ def build_examples():
logger.exception("Failed to copy testing library")
return False

# Copy SDK source from the main SDK package
testing_src = (
Path(__file__).parent.parent
/ "aws-durable-execution-sdk-python"
/ "src"
/ "aws_durable_execution_sdk_python"
)
logger.info("Copying SDK from %s", testing_src)
shutil.copytree(testing_src, build_dir / "aws_durable_execution_sdk_python")
# Install local packages so their runtime dependencies are included in
# the Lambda deployment package.
runtime_packages = [
packages_dir / "aws-durable-execution-sdk-python",
packages_dir / "aws-durable-execution-sdk-python-otel",
Comment thread
zhongkechen marked this conversation as resolved.
]
try:
subprocess.run(
[
sys.executable,
"-m",
"pip",
"install",
"--upgrade",
"--target",
str(build_dir),
*[str(package) for package in runtime_packages],
],
check=True,
)
except subprocess.CalledProcessError:
logger.exception("Failed to install runtime dependencies")
return False
Comment thread
zhongkechen marked this conversation as resolved.

# Copy example functions
logger.info("Copying examples from %s", src_dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,17 @@
"ExecutionTimeout": 300
},
"path": "./src/plugin/execution_with_plugin.py"
},
{
"name": "Otel Plugin",
"description": "Test Otel plugin",
"handler": "execution_with_otel.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/plugin/execution_with_otel.py"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Demonstrates handler execution without any durable operations."""

from typing import Any

from opentelemetry import trace

from aws_durable_execution_sdk_python import StepContext
from aws_durable_execution_sdk_python.config import Duration, StepConfig, StepSemantics
from aws_durable_execution_sdk_python.context import (
DurableContext,
durable_step,
durable_with_child_context,
)
from aws_durable_execution_sdk_python.execution import durable_execution

from aws_durable_execution_sdk_python_otel import DurableExecutionOtelPlugin


# use default provider
tracer_provider = trace.get_tracer_provider()
otel = DurableExecutionOtelPlugin(tracer_provider)


@durable_step
def add_numbers(_step_context: StepContext, a: int, b: int) -> int:
return a + b


@durable_with_child_context
def add_numbers_in_child(child_context: DurableContext, a: int, b: int):
result: int = child_context.step(
add_numbers(a, b),
name=f"step-{b}",
)
child_context.wait(
Duration.from_seconds(1),
name=f"wait-{b}",
)
return result


@durable_execution(plugins=[otel])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for follow up in another CR, what are the other setup steps we need for this plugin?

def handler(_event: Any, context: DurableContext) -> int:
result = 0
for i in range(3):
result += context.run_in_child_context(
add_numbers_in_child(6, i),
name=f"context-{i}",
)
return context.step(
add_numbers(result, 2),
name="final-step",
)
18 changes: 18 additions & 0 deletions packages/aws-durable-execution-sdk-python-examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,24 @@
"ExecutionTimeout": 300
}
}
},
Comment thread
zhongkechen marked this conversation as resolved.
"ExecutionWithOtel": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "execution_with_otel.handler",
"Description": "Test Otel plugin",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Tests for step example."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.plugin import execution_with_otel
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=execution_with_otel.handler,
lambda_function_name="Otel Plugin",
)
def test_plugin(durable_runner):
"""Test basic step example."""
with durable_runner:
result = durable_runner.run(input="{}", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == 23

step_result = result.get_step("final-step")
assert deserialize_operation_payload(step_result.result) == 23
2 changes: 2 additions & 0 deletions packages/aws-durable-execution-sdk-python-otel/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies = [
"aws-durable-execution-sdk-python>=1.5.0",
"opentelemetry-api>=1.20.0",
"opentelemetry-sdk>=1.20.0",
"opentelemetry-exporter-otlp",
"opentelemetry-propagator-aws-xray",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if these dependencies are required. If not, we can add them to the dependencies of example package

]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
"""OpenTelemetry instrumentation for AWS Lambda Durable Executions Python SDK."""

from aws_durable_execution_sdk_python_otel.__about__ import __version__
from aws_durable_execution_sdk_python_otel.context_extractors import (
ContextExtractor,
w3c_client_context_extractor,
xray_context_extractor,
)
from aws_durable_execution_sdk_python_otel.deterministic_id_generator import (
DeterministicIdGenerator,
)
from aws_durable_execution_sdk_python_otel.plugin import (
DurableExecutionOtelPlugin,
)


__all__ = [
"__version__",
"ContextExtractor",
"DeterministicIdGenerator",
"DurableExecutionOtelPlugin",
"w3c_client_context_extractor",
"xray_context_extractor",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Context extractors for propagating trace context into durable executions."""

from __future__ import annotations

import os
from typing import TYPE_CHECKING, Callable

from opentelemetry import context as otel_context, propagate


if TYPE_CHECKING:
from opentelemetry.context import Context

from aws_durable_execution_sdk_python.plugin import InvocationStartInfo

ContextExtractor = Callable[["InvocationStartInfo"], "Context"]


def xray_context_extractor(info: "InvocationStartInfo") -> "Context":
"""Read the X-Ray trace header from the _X_AMZN_TRACE_ID environment variable.
The durable execution backend propagates the same Root trace ID to every
invocation, so all invocations share one traceId.
"""
trace_header = os.environ.get("_X_AMZN_TRACE_ID")
if not trace_header:
return otel_context.get_current()
return propagate.extract(
carrier={"X-Amzn-Trace-Id": trace_header},
context=otel_context.get_current(),
)


def w3c_client_context_extractor(info: "InvocationStartInfo") -> "Context":
"""Read W3C traceparent from context.clientContext.custom.traceparent.
Requires the backend clientContext propagation to be enabled.
This extractor is a placeholder for when backend propagation is supported.
"""
return otel_context.get_current()
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Deterministic ID generator for OpenTelemetry spans in durable executions."""

from __future__ import annotations

import hashlib
import os
import re
from datetime import datetime, UTC

from opentelemetry.sdk.trace import IdGenerator, RandomIdGenerator

HASH_LENGTH = 16
HASHED_ID_PATTERN = re.compile(r"^[0-9a-f]{16}$")


def _parse_xray_root_trace_id(trace_header: str | None) -> str | None:
"""Parse the Root trace ID from an X-Ray trace header string.

The header format is:
Root=1-<8 hex>-<24 hex>;Parent=<16 hex>;Sampled=0|1

Returns the root value (e.g. "1-5759e988-bd862e3fe1be46a994272793")
or None if the header is missing or malformed.
"""
if not trace_header:
return None
match = re.search(r"Root=(1-[0-9a-fA-F]{8}-[0-9a-fA-F]{24})", trace_header)
return match.group(1) if match else None


def _xray_trace_id_to_otel(xray_trace_id: str) -> int:
"""Convert an X-Ray trace ID to the W3C/OpenTelemetry 32-char hex format.

X-Ray format: "1-<8hex>-<24hex>" (36 chars with prefix and dashes)
OTel format: "<8hex><24hex>" (32 lowercase hex chars)
"""
otel_id = xray_trace_id.replace("1-", "", 1).replace("-", "").lower()
return int(otel_id, 16)


def _to_otel_trace_id(execution_arn: str, start_timestamp: datetime | None) -> int:
"""Build an OTel-compatible trace ID (128 bits)

First attempts to read the trace ID from the _X_AMZN_TRACE_ID environment
variable that Lambda populates on each invocation. This ties the durable
execution spans to the same trace that X-Ray is already tracking.

Falls back to generating a deterministic trace ID from the execution ARN
and timestamp when the environment variable is not set (e.g. in tests or
non-Lambda environments).
"""
env_trace_id = _parse_xray_root_trace_id(os.environ.get("_X_AMZN_TRACE_ID"))
if env_trace_id:
return _xray_trace_id_to_otel(env_trace_id)

# Fallback: deterministic ID from execution ARN + timestamp
time_part = format(int((start_timestamp or datetime.now(UTC)).timestamp()), "08x")
hash_part = hashlib.blake2b(execution_arn.encode()).hexdigest()[:24] # noqa: S324
return int(f"{time_part}{hash_part}", 16)


def operation_id_to_span_id(operation_id: str) -> int:
"""Derive a deterministic span ID (64 bits) from an operation ID."""
hashed_operation_id = hashlib.blake2b(operation_id.encode()).hexdigest()[:16]
return int(hashed_operation_id, 16)


class DeterministicIdGenerator(IdGenerator):
"""An ID generator that produces deterministic span IDs when a pending
operation ID is set, and random IDs otherwise.

Trace IDs are deterministic when an execution ARN is set, ensuring all
invocations of the same durable execution share a single trace.

Trace IDs embed a real timestamp so they satisfy the X-Ray format
requirement (first 8 hex chars = Unix epoch seconds).
"""

def __init__(self) -> None:
self._next_span_id: int | None = None
self._execution_trace_id: int | None = None
self._random_id_generator = RandomIdGenerator()

def set_next_span_id(self, span_id: int | None) -> None:
"""Set the operation ID to use for the next span's ID.

After one span is created, it resets to random.
"""
self._next_span_id = span_id

def set_trace_id(
self, execution_arn: str, start_timestamp: datetime | None
) -> None:
"""Compute and cache the deterministic trace ID for this execution.

Args:
execution_arn: The durable execution ARN (used for the hash portion).
start_timestamp: start time of invocation
"""
self._execution_trace_id = _to_otel_trace_id(execution_arn, start_timestamp)

def generate_trace_id(self) -> int:
"""Generate a 128-bit trace ID."""
return self._execution_trace_id or self._random_id_generator.generate_trace_id()

def generate_span_id(self) -> int:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method could be called concurrently so set then generate pattern may not work well in a multithreading env

"""Generate a 64-bit span ID."""
span_id, self._next_span_id = self._next_span_id, None
return span_id or self._random_id_generator.generate_span_id()
Loading