Skip to content

feat: AxonFlowChatModel — governed BaseChatModel wrapper for LangChain/LangGraph #115

@gzak

Description

@gzak

Sub-issue of getaxonflow/axonflow-enterprise#1128.

Summary

Add a AxonFlowChatModel class (and a companion AxonFlowRunnableBinding) to axonflow.adapters that wraps any LangChain BaseChatModel with AxonFlow pre-check and audit calls, acting as a transparent drop-in replacement in LangGraph and LangChain pipelines.

Proposed design

Two-class architecture

axonflow.adapters.langchain
├── AxonFlowChatModel(BaseChatModel)         # plain model usage
└── AxonFlowRunnableBinding(RunnableBinding) # tool-bound / structured-output usage

AxonFlowChatModel subclasses BaseChatModel so it satisfies isinstance checks and is a genuine drop-in:

from axonflow.adapters import AxonFlowChatModel
from langchain_anthropic import ChatAnthropic

model = AxonFlowChatModel(
    wrapped=ChatAnthropic(model_name="claude-sonnet-4-6"),
    axonflow=client,
)
# use exactly like ChatAnthropic — in graphs, with bind_tools, etc.

Governance lives in _agenerate (async) and is absent from _generate (sync), since the pre-check and audit APIs are async-only. The sync gap should be documented explicitly.

AxonFlowRunnableBinding subclasses RunnableBinding and is returned by bind_tools and with_structured_output. Governance lives in ainvoke/astream.

bind_tools

Delegates to self.wrapped.bind_tools(tools, **kwargs) and passes the result directly as bound to AxonFlowRunnableBinding:

def bind_tools(self, tools, **kwargs) -> AxonFlowRunnableBinding:
    bound = self.wrapped.bind_tools(tools, **kwargs)  # → RunnableBinding
    return AxonFlowRunnableBinding(
        bound=bound,
        axonflow=self.axonflow,
        user_token=self.user_token,
    )

When AxonFlowRunnableBinding.ainvoke is called, RunnableBinding's invocation machinery calls self.bound.ainvoke(input) — i.e. the original RunnableBinding from bind_tools — which routes to the underlying model with tools already merged in. No internal field access needed.

with_structured_output

BaseChatModel.with_structured_output raises NotImplementedError and must be overridden. Delegating to self.wrapped.with_structured_output() bypasses governance. The fix is to inspect the returned RunnableSequence and surgically replace steps[0] (the model call) with an AxonFlowRunnableBinding, preserving the output parser:

def with_structured_output(self, schema, **kwargs):
    result = self.wrapped.with_structured_output(schema, **kwargs)
    if isinstance(result, RunnableSequence) and isinstance(result.steps[0], RunnableBinding):
        governed = AxonFlowRunnableBinding(
            bound=result.steps[0],
            axonflow=self.axonflow,
            user_token=self.user_token,
        )
        return RunnableSequence(governed, *result.steps[1:])
    # fallback for non-standard implementations
    return AxonFlowRunnableBinding(bound=result, axonflow=self.axonflow, user_token=self.user_token)

This also recovers token usage: AxonFlowRunnableBinding.ainvoke sees the AIMessage before the output parser transforms it, so result.usage_metadata is available for the audit call.

Other with_* methods

Method Behaviour Action needed
with_config Returns RunnableBinding(bound=AxonFlowChatModel) — governance fires automatically None
bind(**kwargs) Same as above None
| operator AxonFlowChatModel is steps[0] — governance fires None
configurable_fields / configurable_alternatives Resolves to AxonFlowChatModel at invocation — governance fires None
with_retry Without intervention, pre-check and audit fire on every retry attempt. Should pre-check once before the retry loop and audit once on final outcome. Override to wrap self.wrapped.with_retry(...) in AxonFlowRunnableBinding, hoisting governance outside the retry loop
with_fallbacks Governance fires on the primary; fallbacks are plain models and bypass governance entirely Override to require/auto-wrap fallbacks, or document as a known gap

user_token — per-invocation via RunnableConfig

Storing user_token as an instance field prevents sharing the wrapper across concurrent requests. The idiomatic approach is to pass it per-invocation via RunnableConfig:

result = await model.ainvoke(
    messages,
    config={"configurable": {"user_token": "user-123"}},
)

Inside _agenerate, read it from run_manager.config["configurable"].get("user_token"), falling back to an instance-level default if set.

Pydantic serialization

The AxonFlow client holds an open httpx.AsyncClient and must not be included in .model_dump() or LangGraph checkpoint serialization. Declare it as Field(exclude=True) or PrivateAttr. PrivateAttr is preferred as it removes the field from the schema entirely, but requires a model_post_init hook or __init__ override since Pydantic won't set private attrs from constructor kwargs automatically.

Files to create/modify

  • New: axonflow/adapters/langchain.pyAxonFlowChatModel, AxonFlowRunnableBinding
  • Update: axonflow/adapters/__init__.py — export both classes
  • New: tests/adapters/test_langchain.py

Known limitations / out of scope for v1

  • _generate (sync path) — no governance; document explicitly
  • with_fallbacks fallback governance — document as known gap, address in follow-up
  • Streaming audit for astream — audit fires after final chunk; token usage may not be available from all providers mid-stream

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions