-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Sub-issue of getaxonflow/axonflow-enterprise#1128.
Summary
Add a AxonFlowChatModel class (and a companion AxonFlowRunnableBinding) to axonflow.adapters that wraps any LangChain BaseChatModel with AxonFlow pre-check and audit calls, acting as a transparent drop-in replacement in LangGraph and LangChain pipelines.
Proposed design
Two-class architecture
axonflow.adapters.langchain
├── AxonFlowChatModel(BaseChatModel) # plain model usage
└── AxonFlowRunnableBinding(RunnableBinding) # tool-bound / structured-output usage
AxonFlowChatModel subclasses BaseChatModel so it satisfies isinstance checks and is a genuine drop-in:
from axonflow.adapters import AxonFlowChatModel
from langchain_anthropic import ChatAnthropic
model = AxonFlowChatModel(
wrapped=ChatAnthropic(model_name="claude-sonnet-4-6"),
axonflow=client,
)
# use exactly like ChatAnthropic — in graphs, with bind_tools, etc.Governance lives in _agenerate (async) and is absent from _generate (sync), since the pre-check and audit APIs are async-only. The sync gap should be documented explicitly.
AxonFlowRunnableBinding subclasses RunnableBinding and is returned by bind_tools and with_structured_output. Governance lives in ainvoke/astream.
bind_tools
Delegates to self.wrapped.bind_tools(tools, **kwargs) and passes the result directly as bound to AxonFlowRunnableBinding:
def bind_tools(self, tools, **kwargs) -> AxonFlowRunnableBinding:
bound = self.wrapped.bind_tools(tools, **kwargs) # → RunnableBinding
return AxonFlowRunnableBinding(
bound=bound,
axonflow=self.axonflow,
user_token=self.user_token,
)When AxonFlowRunnableBinding.ainvoke is called, RunnableBinding's invocation machinery calls self.bound.ainvoke(input) — i.e. the original RunnableBinding from bind_tools — which routes to the underlying model with tools already merged in. No internal field access needed.
with_structured_output
BaseChatModel.with_structured_output raises NotImplementedError and must be overridden. Delegating to self.wrapped.with_structured_output() bypasses governance. The fix is to inspect the returned RunnableSequence and surgically replace steps[0] (the model call) with an AxonFlowRunnableBinding, preserving the output parser:
def with_structured_output(self, schema, **kwargs):
result = self.wrapped.with_structured_output(schema, **kwargs)
if isinstance(result, RunnableSequence) and isinstance(result.steps[0], RunnableBinding):
governed = AxonFlowRunnableBinding(
bound=result.steps[0],
axonflow=self.axonflow,
user_token=self.user_token,
)
return RunnableSequence(governed, *result.steps[1:])
# fallback for non-standard implementations
return AxonFlowRunnableBinding(bound=result, axonflow=self.axonflow, user_token=self.user_token)This also recovers token usage: AxonFlowRunnableBinding.ainvoke sees the AIMessage before the output parser transforms it, so result.usage_metadata is available for the audit call.
Other with_* methods
| Method | Behaviour | Action needed |
|---|---|---|
with_config |
Returns RunnableBinding(bound=AxonFlowChatModel) — governance fires automatically |
None |
bind(**kwargs) |
Same as above | None |
| operator |
AxonFlowChatModel is steps[0] — governance fires |
None |
configurable_fields / configurable_alternatives |
Resolves to AxonFlowChatModel at invocation — governance fires |
None |
with_retry |
Without intervention, pre-check and audit fire on every retry attempt. Should pre-check once before the retry loop and audit once on final outcome. | Override to wrap self.wrapped.with_retry(...) in AxonFlowRunnableBinding, hoisting governance outside the retry loop |
with_fallbacks |
Governance fires on the primary; fallbacks are plain models and bypass governance entirely | Override to require/auto-wrap fallbacks, or document as a known gap |
user_token — per-invocation via RunnableConfig
Storing user_token as an instance field prevents sharing the wrapper across concurrent requests. The idiomatic approach is to pass it per-invocation via RunnableConfig:
result = await model.ainvoke(
messages,
config={"configurable": {"user_token": "user-123"}},
)Inside _agenerate, read it from run_manager.config["configurable"].get("user_token"), falling back to an instance-level default if set.
Pydantic serialization
The AxonFlow client holds an open httpx.AsyncClient and must not be included in .model_dump() or LangGraph checkpoint serialization. Declare it as Field(exclude=True) or PrivateAttr. PrivateAttr is preferred as it removes the field from the schema entirely, but requires a model_post_init hook or __init__ override since Pydantic won't set private attrs from constructor kwargs automatically.
Files to create/modify
- New:
axonflow/adapters/langchain.py—AxonFlowChatModel,AxonFlowRunnableBinding - Update:
axonflow/adapters/__init__.py— export both classes - New:
tests/adapters/test_langchain.py
Known limitations / out of scope for v1
_generate(sync path) — no governance; document explicitlywith_fallbacksfallback governance — document as known gap, address in follow-up- Streaming audit for
astream— audit fires after final chunk; token usage may not be available from all providers mid-stream