A tiny async task-tree orchestrator library for Python, behavior-tree inspired and LLM-ready.
- Modular, composable task graph building blocks
- Behavior-tree inspired control flow with explicit success/failure semantics
- Async-first execution with local trace visualization
from tinytasktree import Tree
tree = (
Tree("HelloWorld")
.Sequence()
._().Function(lambda: "hello")
.End()
)import os
from dataclasses import dataclass
from tinytasktree import Context, JSON, Result, Tool, Tracer, Tree
@dataclass
class Blackboard:
messages: list[JSON]
done: bool = False
def make_messages(b: Blackboard) -> list[JSON]:
system = {"role": "system", "content": "Use tools when useful, then answer with the tool result."}
return [system, *b.messages]
class WeatherTool(Tool[Blackboard]):
NAME = "get_weather"
DESCRIPTION = "Get mock weather for a city."
SCHEMA = {"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]}
async def execute(self, blackboard: Blackboard, arguments: JSON, context: Context, tracer: Tracer) -> JSON:
return {"city": arguments["city"], "condition": "sunny", "temperature_c": 25}
def store_llm_message(b: Blackboard, message: JSON, tracer: Tracer) -> None:
b.messages.append(message)
async def decide_next_step(b: Blackboard, _tracer: Tracer, context: Context) -> Result:
record = context._last_result.data
b.done = not bool(record.tool_calls)
return Result.OK(record.final_output if b.done else None)
api_key = os.getenv("LLM_API_KEY")
base_url = os.getenv("LLM_BASE_URL") or ""
extra_body = {"reasoning": {"enabled": False}}
tree = (
Tree[Blackboard]("WeatherAgent")
.While(lambda b: not b.done, max_loop_times=4)
._().Sequence()
._()._().LLM("deepseek-v4-flash", make_messages, api_key=api_key, base_url=base_url, extra_body=extra_body, tools=[WeatherTool()], on_llm_message=store_llm_message)
._()._().Function(decide_next_step)
.End()
)
async def main():
blackboard = Blackboard(messages=[{"role": "user", "content": "How is the weather in Tokyo?"}])
context = Context()
async with context.using_blackboard(blackboard):
result = await tree(context)
print(result.data)
print(blackboard.messages)- Python 3.13+
- openai-python (only needed for
LLMnodes) - A cache store backend is only needed for
CacherandTerminablenodes
- Minimal, expressive tree builder API
- Async-first execution model
- Leaf / Composite / Decorator nodes built-in
- LLM integration via openai-python
- Store-backed caching and termination signaling
- Trace collection and optional trace storage
- UI trace viewer with HTTP server
Alpha. Expect breaking changes until the API is stabilized.
uv add tinytasktreeor
pip install tinytasktreeSave traces into the same directory that the backend serves, for example:
from tinytasktree import Context, FileTraceStorageHandler
storage = FileTraceStorageHandler(".traces")
context = Context()
async with context.using_blackboard(blackboard):
result = await tree(context)
trace_id = await storage.save(context.trace_root())
print("Trace URL:", f"http://127.0.0.1:8000/{trace_id}")Then start the backend and UI:
python -m tinytasktree --httpserver --host 127.0.0.1 --port 8000 --trace-dir .traces
# Visit http://127.0.0.1:8000Notes:
--trace-dir .tracesmust point to the same directory used byFileTraceStorageHandler(".traces")- Opening
http://127.0.0.1:8000/lists saved traces in the current trace directory, newest first - Opening
http://127.0.0.1:8000/<trace_id>loads a specific trace directly
- Execution model: nodes are awaited; composite nodes control child ordering and concurrency
- Results: nodes return
OK(data)orFAIL(data)and composites propagate or short-circuit - Blackboard: a per-run data object passed through the tree via
Context
- Node Reference
- Core APIs (Non-Node)
- Contributing
- License
Node Reference [↑]
Node Result & Status [↑]
- Every node returns a
Resultwith a status (OKorFAIL) and an optional data payload - Composite nodes typically short-circuit on
FAIL(e.g.,Sequence) or onOK(e.g.,Selector) - Decorators can override or invert status while preserving or transforming data
Leaf Nodes [↑]
Function [↑]
Runs a sync/async function. Returns OK(data) for non-Result return values, or passes through a Result.
Usage:
- Accepts 0/1/2 params: (), (blackboard), (blackboard, tracer)
- Sync or async functions are supported
- Returning
Resultbypasses wrapping; otherwiseOK(value)
Supported function forms:
() -> Anyor() -> Result(blackboard) -> Anyor(blackboard) -> Result(blackboard, tracer) -> Anyor(blackboard, tracer) -> Result- Async variants of all the above
tree = (
Tree()
.Sequence()
._().Function(lambda: "ok1")
._().Function(lambda blackboard: "ok2")
._().Function(lambda blackboard, tracer: "ok3")
.End()
)Log [↑]
Logs a message into the trace. Always returns OK(None).
Usage:
msg_or_factory: string or(blackboard) -> strlevel: trace level (default: info)- Emits a trace log entry and continues
tree = (
Tree()
.Sequence()
._().Log("hello step1")
._().Log(lambda b: f"hello, step2: job={b.job_id}", level="debug")
.End()
)TODO [↑]
A placeholder node that always returns OK(None).
Usage:
- No-op leaf for scaffolding or TODO spots
- Replace with real nodes later
tree = (
Tree()
.Sequence()
._().TODO("Prepare the Params")
._().TODO("Call the LLM")
._().Function(real_step)
._().TODO("Collect the result")
.End()
)ShowBlackboard [↑]
Returns the current blackboard in OK(b).
Usage:
- Helpful for debugging or inspection
- Downstream nodes can consume the returned blackboard
tree = (
Tree()
.ShowBlackboard()
.End()
)WriteBlackboard [↑]
Writes the previous node's result into the blackboard, and returns OK(data).
Usage:
attr_or_func: attribute name or(blackboard, data) -> None- Reads
last_result.data; warns if no last result - Returns
OK(data)(orOK(None)if missing)
tree = (
Tree()
.Sequence()
._().Function(lambda: 123)
._().WriteBlackboard("value")
.End()
)or:
def _set_value(b: Blackboard, v: int) -> None:
b.double_value = v * 2
tree = (
Tree()
.Sequence()
._().Function(lambda: 7)
._().WriteBlackboard(_set_value)
.End()
)Assert [↑]
Checks a boolean condition and returns OK(True) or FAIL(False).
Usage:
- Condition can be attr name or function
- Sync/async; params 0/1/2: (), (blackboard), (blackboard, tracer)
AssertionErroris treated as false
tree = (
Tree()
.Sequence()
._().Assert(lambda: True)
._().Assert("is_ready") # checks `blackboard.is_ready`
._().Function(run_job)
.End()
)Failure [↑]
Always returns FAIL(None).
Usage:
- Useful for tests, guards, or forcing failures
tree = (
Tree()
.Selector()
._().Assert("has_cache")
._().Failure()
.End()
)Subtree [↑]
Runs another tree, optionally with a custom blackboard factory.
Usage:
subtree_blackboard_factory:(parent_blackboard) -> child_blackboard- Result is the subtree's result
sub = (
Tree()
.Sequence()
._().Function(lambda: "x")
.End()
)
tree = (
Tree()
.Sequence()
._().Subtree(sub) # or _().Subtree(sub, lambda b: SubBlackboard(b.text))
.End()
)ParseJSON [↑]
Parses JSON from the last result or from a blackboard source, and returns the parsed object.
Usage:
src: last result (default), blackboard attr, or(blackboard) -> strdst: optional blackboard attr or(blackboard, data) -> None- Strips common ```json fences before parsing
- Default loader tries
json_repairif installed, otherwiseorjson, otherwise the standard libraryjson - Recommended: install
json_repairwhen parsing LLM-generated or otherwise non-strict JSON
tree = (
Tree()
.Sequence()
._().Function(lambda: '{"a":1}')
._().ParseJSON(dst="data")
.End()
)If json_repair is installed, no extra code is needed for tolerant parsing:
tree = (
Tree()
.Sequence()
._().Function(lambda: '{"a": 1')
._().ParseJSON(dst="data")
.End()
)Another example:
def set_parsed_value(b: Blackboard, d: JSON) -> None:
b.parsed = d
tree = (
Tree()
.Sequence()
._().ParseJSON(src="raw_json", dst=set_parsed_value)
.End()
)LLM [↑]
Calls an LLM via openai-python and returns the output text. Supports streaming, API key factories, and OpenAI-compatible base URLs for custom LLM gateways.
Usage:
model/messagescan be values or(blackboard) -> ...factoriesstream: bool or(blackboard) -> bool;stream_on_deltasupports sync/async callbacksapi_key: string or factory(blackboard)/(blackboard, model)base_url: string or(blackboard) -> str | Nonefor OpenAI-compatible providersclient_kwargs: explicit kwargs forwarded toAsyncOpenAI(...)extra_body: explicit provider-specific request body fields merged intoextra_body**llm_call_kwargs: regular request kwargs forwarded tochat.completions.create(...)LLMModelcan optionally carryinput_price_per_m/output_price_per_min USD per 1M tokens- Tracer records tokens when the provider returns usage
- Cost is taken from provider metadata when available; otherwise it falls back to token usage and the
LLMModelprices
provider = LLMProvider(base_url="https://llm.example/v1", api_key="...")
model = LLMModel(
"gpt-4.1-mini",
provider=provider,
extra_body={"reasoning": {"enabled": False}},
input_price_per_m=0.15,
output_price_per_m=0.60,
)
tree = (
Tree()
.Sequence()
._().LLM(model, [{"role": "user", "content": "hi"}])
.End()
)Streaming response example:
from tinytasktree import Tree
def on_delta(b, full, delta, done, reason=""):
if delta:
print(delta, end="")
tree = (
Tree()
.Sequence()
._().LLM(lambda b: b.model, lambda b: b.messages, stream=True, stream_on_delta=on_delta)
.End()
)Tool Call [↑]
LLMNode supports tool definitions out of the box: it sends Tool schemas to the model,
then executes returned tool calls in order. The node still performs exactly one model call;
use an outer While if the model should see tool results and answer in a later LLM call.
from tinytasktree import Context, JSON, Tool, Tree
class WeatherTool(Tool[Blackboard]):
NAME = "get_weather"
DESCRIPTION = "Get the current weather in a given location"
SIGNATURES = ["get_weather(location: str) -> object"]
EXAMPLES = ['get_weather({"location": "Beijing"})']
SCHEMA = {"type": "object", "properties": {"location": {"type": "string"}}, "required": ["location"]}
async def execute(self, blackboard: Blackboard, arguments: JSON, context: Context, tracer) -> JSON:
return {"location": arguments["location"], "weather": "sunny", "temperature": 25}
tree = (
Tree[Blackboard]("WeatherApp")
.Sequence()
._().LLM(model, make_messages, tools=[WeatherTool()], on_llm_message=lambda b, message, tracer: b.messages.append(message))
._().WriteBlackboard("llm_record")
.End()
)The LLM node returns an LLMRunRecord containing final text, full messages, emitted
assistant/tool messages, requested tool calls, and tool results. Streaming tool call deltas
are accumulated before execution. Use on_llm_message to persist each emitted message as it
is produced.
Trace chat view:
async def decide_next_step(b: Blackboard, tracer: Tracer, context: Context) -> Result:
record = context._last_result.data
b.done = not bool(record.tool_calls)
# Mark the current node and the outer loop with a full chat transcript.
tracer.update_attributes(chat_transcript=b.messages)
context.parent_tracer(2).update_attributes(chat_transcript=b.messages)
return Result.OK(record.final_output if b.done else None)When chat_transcript is a list of messages, trace nodes are marked with has_chat=True;
the trace UI can then show the node as a chat-capable step and render user, assistant,
assistant tool call, and tool result messages together.
Composite Nodes [↑]
Sequence [↑]
Runs children in order. Returns FAIL on first failure, otherwise OK(last_child_data).
Usage:
- Stops on first
FAILand returns last successful data - Empty sequence returns
OK(None)
tree = (
Tree()
.Sequence()
._().Function(A)
._().Function(B)
.End()
)Selector [↑]
Runs children in order until one succeeds. Returns the first OK, else FAIL.
Usage:
- Short-circuits on first success
- Empty selector returns
OK(None)
tree = (
Tree()
.Selector()
._().Failure()
._().Function(lambda: "ok")
.End()
)Selector is the typical choice for the fallback chain pattern:
provider = LLMProvider(base_url="https://llm.example/v1", api_key="...")
model1 = LLMModel("model1", provider=provider)
model2 = LLMModel("model2", provider=provider)
model3 = LLMModel("model3", provider=provider)
tree = (
Tree()
.Selector()
._().Timeout(20)
._()._().LLM(model1, llm_message)
._().Timeout(20)
._()._().LLM(model2, llm_message)
._().Timeout(20)
._()._().LLM(model3, llm_message)
.End()
)Parallel [↑]
Runs children concurrently. Returns OK only if all children succeed.
Usage:
concurrency_limitmust be > 0- Result data is
None
tree = (
Tree()
.Parallel(concurrency_limit=2)
._().Function(A)
._().Function(B)
.End()
)Gather [↑]
Runs multiple subtrees with their own blackboards and returns a list of results.
Usage:
params_factory:(blackboard) -> (trees, blackboards)- Runs each tree with its paired blackboard
- Returns list of child data in tree order
tree = (
Tree()
.Gather(lambda b: (trees, blackboards))
.End()
)In a more detailed example:
def build_params(b):
trees = [subtree1, subtree2]
bbs = [BB(x=1), BB(x=2)]
return trees, bbs
tree = (
Tree()
.Gather(build_params, concurrency_limit=2)
.End()
)RandomSelector [↑]
Randomizes the child order (optionally weighted) and returns the first OK.
Usage:
weights: list or(blackboard) -> list[float]- Weights must be positive and match child count
tree = (
Tree()
.RandomSelector(weights=[0.4, 0.4, 0.2]) # or weights from a factory: lambda b: b.route_weights
._().Function(A)
._().Function(B)
._().Function(C)
.End()
)If / Else [↑]
Conditional branch. If the condition is false and no else branch exists, returns OK(None).
Usage:
- Condition supports attr name or function (sync/async)
- 1 child (if) or 2 children (if + else)
Elsenode must be a child ofIf
tree = (
Tree()
.If(lambda b: b.flag)
._().Function(A)
._().Else()
._()._().Function(B)
.End()
)Or uses only If (without Else):
tree = (
Tree()
.If("is_admin")
._().Function(admin_flow)
.End()
)Decorator Nodes [↑]
ForceOk [↑]
Forces the result status to OK, optionally with a custom data factory.
Usage:
- Optional
result_factory(blackboard) -> data - If omitted, preserves child data
tree = (
Tree()
.ForceOk()
._().Failure()
.End()
)Or a ForceOk overriding the result:
tree = (
Tree()
.ForceOk(lambda b: {"skipped": True})
._().Function(best_effort)
.End()
)ForceFail [↑]
Forces the result status to FAIL, optionally with a custom data factory.
Usage:
- Optional
result_factory(blackboard) -> data - If omitted, preserves child data
tree = (
Tree()
.ForceFail()
._().Function(lambda: "x")
.End()
)Return [↑]
Preserves child status but replaces data with a factory result.
Usage:
result_factory(blackboard) -> data- Status is unchanged from child
tree = (
Tree()
.Return(lambda b: "data")
._().Function(A)
.End()
)Invert [↑]
Inverts child status while keeping data.
Usage:
OKbecomesFAIL,FAILbecomesOK- Data is preserved
tree = (
Tree()
.Invert()
._().Failure()
.End()
)Retry [↑]
Retries a child on failure for up to max_tries with optional sleeps.
Usage:
sleep_secs: float or list per retry index- Returns first
OK, elseFAIL(None)
tree = (
Tree()
.Retry(max_tries=3, sleep_secs=0.1) # or usage: [0.1, 0.2, 0.5]
._().Function(A)
.End()
)While [↑]
Repeats child while condition is true, returns the last successful result.
Usage:
- Condition supports attr name or function (sync/async)
max_loop_timesguards infinite loops
tree = (
Tree()
.While(lambda b: b.count < 3)
._().Function(step)
.End()
)Timeout / Fallback [↑]
Runs a child with a time limit. On timeout, runs the fallback child if provided.
Usage:
Timeouthas 1 child (main) or 2 (main + fallback)- On timeout, returns
FAIL(None)or executes fallback Fallbacknode must be a child ofTimeoutorTerminable
tree = (
Tree()
.Timeout(1.0)
._().Function(slow)
._().Function(on_timeout)
.End()
)With .Fallback():
tree = (
Tree()
.Timeout(2.0)
._().Function(main_job)
._().Fallback()
._()._().Function(fallback_job)
.End()
)Cacher [↑]
Caches child results in a key-value store. Optional value_validator invalidates stale cache.
Usage:
key_func(blackboard) -> str, requiredstoreexpiration: seconds,timedelta, or random(min, max)value_validator:(blackboard)or(blackboard, tracer)enabled: bool or(blackboard) -> bool
tree = (
Tree()
.Cacher(key_func=lambda b: b.key, store=store, enabled=lambda b: b.use_cache)
._().Function(expensive_call)
.End()
)Redis example:
tinytasktree itself does not depend on Redis, but redis.asyncio.Redis can be used directly as the store:
import redis.asyncio as redis
store = redis.Redis.from_url("redis://127.0.0.1:6379")
tree = (
Tree()
.Cacher(key_func=lambda b: f"user:{b.user_id}", store=store, expiration=60)
._().Function(fetch_user)
.End()
)With a value_validator, the cache is only considered a hit if this
value matches the one stored during the cache set. This is useful for invalidating cache when dependent logic or state changes:
tree = (
Tree()
.Cacher(key_func=lambda b: f"user:{b.user_id}", store=store, value_validator=lambda b: b.version)
._().Function(fetch_user)
.End()
)Terminable [↑]
Runs a child while polling a store key for termination. Optionally runs a fallback.
Usage:
key_func(blackboard) -> signal_key, requiredstore- Monitors until key exists; then cancels child
- 1 child (main) or 2 (main + fallback)
tree = (
Tree()
.Terminable(lambda b: f"stop:{b.job_id}", store=store)
._().Function(A)
._().Fallback()
._()._().Function(B)
.End()
)
# To trigger termination from an external script or process:
await store.set(f"stop:{job_id}", "1")Wrapper [↑]
Wraps a child with a custom async context manager.
Usage:
func(child, context) -> async context manageryielding aResult- Useful for custom setup/teardown or instrumentation
from contextlib import asynccontextmanager
@asynccontextmanager
async def traced(child, context):
try:
print("before run")
result = await child(context)
yield result
finally:
print("after run")
tree = (
Tree()
.Wrapper(traced)
._().Function(run)
.End()
)Context: runtime state (blackboard stack, trace root, path)TraceRoot/TraceNode: structured trace treeTraceStorageHandler/FileTraceStorageHandler: save and load tracesregister_global_hook_after_spawned_task_finish(hook): hook for Parallel/Gather/Terminable tasksrun_httpserver(host, port, trace_dir)/create_http_app(...): built-in HTTP trace server
- Install dev dependencies:
uv sync --dev - Build a wheel or sdist with bundled UI assets:
uv build - Lint:
uv run ruff check . - Test:
uv run pytest
MIT. See LICENSE.txt.
- Metrics Handler
- Build Tasktree from json
