Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/test_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async def test_cloud():
test_endpoint,
"app-123",
"func-456",
None,
)

# Mock update operation
Expand Down
36 changes: 36 additions & 0 deletions veadk/a2a/remote_ve_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,42 @@ async def _pre_run(self, ctx: InvocationContext) -> None:
# Inject auth token if credential service is available
await self._inject_auth_token(ctx)

# Inject session_id to http headers
if hasattr(self, "_a2a_client") and isinstance(self._a2a_client, BaseClient):
if hasattr(self._a2a_client, "_transport") and hasattr(
self._a2a_client._transport, "httpx_client"
):
# get session_id from InvocationContext
import uuid

parent_name = (
ctx.agent.parent_agent.name
if hasattr(ctx.agent, "parent_agent") and ctx.agent.parent_agent
else self.name
)
user_id = (
ctx.session.user_id
if hasattr(ctx, "session")
and hasattr(ctx.session, "user_id")
and ctx.session.user_id
else "unknown"
)
session_id = (
ctx.session.id
if hasattr(ctx, "session")
and hasattr(ctx.session, "id")
and ctx.session.id
else str(uuid.uuid4())
)
x_session_id = f"{parent_name}_{user_id}_{session_id}"
x_session_id_key = "x-session-id-veadk"
self._a2a_client._transport.httpx_client.headers.update(
{x_session_id_key: x_session_id}
)
logger.debug(
f"a2a client inject {x_session_id_key} to header: {x_session_id}"
)

async def _inject_auth_token(self, ctx: InvocationContext) -> None:
"""Inject authentication token from credential service into the HTTP client.

Expand Down
28 changes: 20 additions & 8 deletions veadk/cloud/cloud_agent_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def deploy(
identity_user_pool_name: str = "",
identity_client_name: str = "",
local_test: bool = False,
enable_session_affinity: bool = False,
) -> CloudApp:
"""Deploys a local agent project to Volcengine FaaS, creating necessary resources.

Expand Down Expand Up @@ -287,15 +288,20 @@ def deploy(
identity_client_name = f"{application_name}-id-cli-{formatted_timestamp()}"

try:
vefaas_application_url, app_id, function_id = self._vefaas_service.deploy(
path=path,
name=application_name,
gateway_name=gateway_name,
gateway_service_name=gateway_service_name,
gateway_upstream_name=gateway_upstream_name,
enable_key_auth=enable_key_auth,
vefaas_application_url, app_id, function_id, affinity_binding_id = (
self._vefaas_service.deploy(
path=path,
name=application_name,
gateway_name=gateway_name,
gateway_service_name=gateway_service_name,
gateway_upstream_name=gateway_upstream_name,
enable_key_auth=enable_key_auth,
enable_session_affinity=enable_session_affinity,
)
)
_ = function_id # for future use
_ = function_id
if affinity_binding_id:
logger.info(f"Session affinity plugin bindng_id: {affinity_binding_id}")

veapig_gateway_id, _, veapig_route_id = (
self._vefaas_service.get_application_route(app_id=app_id)
Expand Down Expand Up @@ -393,6 +399,7 @@ def deploy(
vefaas_application_name=application_name,
vefaas_endpoint=vefaas_application_url,
vefaas_application_id=app_id,
affinity_binding_id=affinity_binding_id,
)
except Exception as e:
raise ValueError(
Expand Down Expand Up @@ -483,3 +490,8 @@ def update_function_code(
raise ValueError(
f"Failed to update agent project on Volcengine FaaS platform. Error: {e}"
)

def disable_session_affinity(self, binding_id: str) -> None:
"""Disable session affinity by deleting plugin binding."""
self._vefaas_service.disable_session_affinity(binding_id)
logger.info(f"Session affinity plugin binding {binding_id} deleted.")
2 changes: 2 additions & 0 deletions veadk/cloud/cloud_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
vefaas_endpoint: str = "",
vefaas_application_id: str = "",
use_agent_card: bool = False,
affinity_binding_id: str | None = None,
):
"""Initializes the CloudApp with VeFaaS application details.

Expand Down Expand Up @@ -92,6 +93,7 @@ def __init__(
self.vefaas_application_id = vefaas_application_id
self.vefaas_application_name = vefaas_application_name
self.use_agent_card = use_agent_card
self.affinity_binding_id = affinity_binding_id

# vefaas must be set one of three
if (
Expand Down
51 changes: 50 additions & 1 deletion veadk/integrations/ve_apig/ve_apig.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import time

import json
import volcenginesdkcore
from volcenginesdkapig import APIGApi
from volcenginesdkapig20221112 import APIG20221112Api, UpstreamListForCreateRouteInput
Expand Down Expand Up @@ -347,3 +347,52 @@ def create(
"upstream_id": upstream_id,
"route_ids": route_ids,
}

def create_session_affinity_plugin(self, gateway_id: str) -> str:
"""Create session affinity plugin on gateway. Returns plugin_id."""
response = ve_request(
request_body={
"PluginName": "wasm-session-affinity-pro",
"PluginConfig": "",
"GatewayId": gateway_id,
"Enable": True,
},
action="CreatePlugin",
ak=self.ak,
sk=self.sk,
service="apig",
version="2022-11-12",
region=self.region,
host="open.volcengineapi.com",
)
return response["Result"]["PluginID"]

def bind_session_affinity_plugin(self, service_id: str) -> str:
"""Bind session affinity plugin to service. Returns binding_id."""
plugin_config = json.dumps(
{
"Position": "Header",
"DownstreamSessionKey": "x-session-id-veadk",
"UpstreamHeaders": [],
"FailureModeAllow": False,
}
)
return self.create_plugin_binding(
scope="SERVICE",
target=service_id,
plugin_name="wasm-session-affinity-pro",
plugin_config=plugin_config,
)

def delete_plugin_binding(self, binding_id: str) -> None:
"""Delete a plugin binding by id."""
ve_request(
request_body={"Id": binding_id},
action="DeletePluginBinding",
ak=self.ak,
sk=self.sk,
service="apig",
version="2021-03-03",
region=self.region,
host="open.volcengineapi.com",
)
44 changes: 39 additions & 5 deletions veadk/integrations/ve_faas/ve_faas.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ def deploy(
gateway_service_name: str = "",
gateway_upstream_name: str = "",
enable_key_auth: bool = False,
) -> tuple[str, str, str]:
enable_session_affinity: bool = False,
) -> tuple[str, str, str, str | None]:
"""Deploy an agent project to VeFaaS service.

Args:
Expand All @@ -456,7 +457,7 @@ def deploy(
enable_key_auth (bool, optional): Enable key auth. Defaults to False.

Returns:
tuple[str, str, str]: (url, app_id, function_id)
tuple[str, str, str, str | None]: (url, app_id, function_id, affinity_binding_id)
"""
# Naming check
if "_" in name:
Expand Down Expand Up @@ -508,7 +509,11 @@ def deploy(

logger.info(f"VeFaaS application {name} with ID {app_id} deployed on {url}.")

return url, app_id, function_id
# Enable session affinity plugin
affinity_binding_id = None
if enable_session_affinity:
affinity_binding_id = self._enable_session_affinity(app_id)
return url, app_id, function_id, affinity_binding_id

def _create_image_function(self, function_name: str, image: str):
"""Create function using container image instead of code upload."""
Expand Down Expand Up @@ -678,7 +683,8 @@ def deploy_image(
gateway_name: str = "",
gateway_service_name: str = "",
gateway_upstream_name: str = "",
) -> tuple[str, str, str]:
enable_session_affinity: bool = False,
) -> tuple[str, str, str, str | None]:
"""Deploy application using container image.

Args:
Expand Down Expand Up @@ -767,7 +773,11 @@ def deploy_image(

logger.info(f"VeFaaS application {name} with ID {app_id} deployed on {url}.")

return url, app_id, function_id
# Enable session affinity plugin
affinity_binding_id = None
if enable_session_affinity:
affinity_binding_id = self._enable_session_affinity(app_id)
return url, app_id, function_id, affinity_binding_id

def _get_application_logs(self, app_id: str) -> list[str]:
response = _ = ve_request(
Expand All @@ -786,3 +796,27 @@ def _get_application_logs(self, app_id: str) -> list[str]:
return logs
except Exception as _:
raise ValueError(f"Get application log failed. Response: {response}")

def _enable_session_affinity(self, app_id: str) -> str | None:
"""Enable session affinity for application. Returns binding_id."""
route_info = self.get_application_route(app_id=app_id)
if not route_info:
logger.warning(f"Cannot get route info for app {app_id}, skip affinity")
return None
gateway_id, service_id, _ = route_info
# Create plugin on gateway (ignore if already exists)
try:
self.apig_client.create_session_affinity_plugin(gateway_id)
except Exception:
logger.warning(
f"Create session affinity plugin failed. Gateway ID: {gateway_id}"
)
# Bind plugin to service
binding_id = self.apig_client.bind_session_affinity_plugin(service_id)
logger.info(f"Session affinity enabled, binding_id: {binding_id}")
return binding_id

def disable_session_affinity(self, binding_id: str) -> None:
"""Disable session affinity by deleting plugin binding."""
self.apig_client.delete_plugin_binding(binding_id)
logger.info(f"Session affinity disabled, binding_id: {binding_id}")
20 changes: 20 additions & 0 deletions veadk/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,10 @@ async def run(
f"Auto create session: {session.id}, user_id: {session.user_id}, app_name: {self.app_name}"
)

import uuid

x_session_id = f"{self.agent.name}_{user_id}_{session_id or uuid.uuid4()}"
self._inject_header_to_tools(x_session_id=x_session_id)
final_output = ""
for converted_message in converted_messages:
try:
Expand Down Expand Up @@ -747,3 +751,19 @@ async def save_session_to_long_term_memory(

await self.long_term_memory.add_session_to_memory(session, kwargs=kwargs)
logger.info(f"Add session `{session.id}` to long term memory.")

def _inject_header_to_tools(self, x_session_id: str):
"""Auto inject header to McpToolset"""
from google.adk.tools.mcp_tool import McpToolset

x_session_id_key = "x-session-id-veadk"
for tool in self.agent.tools:
if isinstance(tool, McpToolset):
original_provider = tool._header_provider
tool._header_provider = lambda ctx, sid=x_session_id: {
**(original_provider(ctx) if original_provider else {}),
x_session_id_key: sid,
}
logger.debug(
f"mcp client inject {x_session_id_key} to McpToolset: {x_session_id}"
)