Skip to content

orion-arm-ai/tinytasktree

Repository files navigation

tinytasktree

CI

A tiny async task-tree orchestrator library for Python, behavior-tree inspired and LLM-ready.

Why tinytasktree?

  • Modular, composable task graph building blocks
  • Behavior-tree inspired control flow with explicit success/failure semantics
  • Async-first execution with local trace visualization

Hello World

from tinytasktree import Tree

tree = (
    Tree("HelloWorld")
    .Sequence()
    ._().Function(lambda: "hello")
    .End()
)

LLM Tool Call Example

import os
from dataclasses import dataclass

from tinytasktree import Context, JSON, Result, Tool, Tracer, Tree

@dataclass
class Blackboard:
    messages: list[JSON]
    done: bool = False


def make_messages(b: Blackboard) -> list[JSON]:
    system = {"role": "system", "content": "Use tools when useful, then answer with the tool result."}
    return [system, *b.messages]


class WeatherTool(Tool[Blackboard]):
    NAME = "get_weather"
    DESCRIPTION = "Get mock weather for a city."
    SCHEMA = {"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]}

    async def execute(self, blackboard: Blackboard, arguments: JSON, context: Context, tracer: Tracer) -> JSON:
        return {"city": arguments["city"], "condition": "sunny", "temperature_c": 25}


def store_llm_message(b: Blackboard, message: JSON, tracer: Tracer) -> None:
    b.messages.append(message)


async def decide_next_step(b: Blackboard, _tracer: Tracer, context: Context) -> Result:
    record = context._last_result.data
    b.done = not bool(record.tool_calls)
    return Result.OK(record.final_output if b.done else None)


api_key = os.getenv("LLM_API_KEY")
base_url = os.getenv("LLM_BASE_URL") or ""
extra_body = {"reasoning": {"enabled": False}}

tree = (
    Tree[Blackboard]("WeatherAgent")
    .While(lambda b: not b.done, max_loop_times=4)
    ._().Sequence()
    ._()._().LLM("deepseek-v4-flash", make_messages, api_key=api_key, base_url=base_url, extra_body=extra_body, tools=[WeatherTool()], on_llm_message=store_llm_message)
    ._()._().Function(decide_next_step)
    .End()
)

async def main():
    blackboard = Blackboard(messages=[{"role": "user", "content": "How is the weather in Tokyo?"}])
    context = Context()
    async with context.using_blackboard(blackboard):
        result = await tree(context)

    print(result.data)
    print(blackboard.messages)

Requirements

  • Python 3.13+
  • openai-python (only needed for LLM nodes)
  • A cache store backend is only needed for Cacher and Terminable nodes

Features

  • Minimal, expressive tree builder API
  • Async-first execution model
  • Leaf / Composite / Decorator nodes built-in
  • LLM integration via openai-python
  • Store-backed caching and termination signaling
  • Trace collection and optional trace storage
  • UI trace viewer with HTTP server

API Stability

Alpha. Expect breaking changes until the API is stabilized.

⚠️ Warning: This is currently only at ALPHA STAGE, and future API changes may introduce breaking changes.

Installation

uv add tinytasktree

or

pip install tinytasktree

UI Trace Server

Save traces into the same directory that the backend serves, for example:

from tinytasktree import Context, FileTraceStorageHandler

storage = FileTraceStorageHandler(".traces")

context = Context()
async with context.using_blackboard(blackboard):
    result = await tree(context)

trace_id = await storage.save(context.trace_root())
print("Trace URL:", f"http://127.0.0.1:8000/{trace_id}")

Then start the backend and UI:

python -m tinytasktree --httpserver --host 127.0.0.1 --port 8000 --trace-dir .traces
# Visit http://127.0.0.1:8000

Notes:

  • --trace-dir .traces must point to the same directory used by FileTraceStorageHandler(".traces")
  • Opening http://127.0.0.1:8000/ lists saved traces in the current trace directory, newest first
  • Opening http://127.0.0.1:8000/<trace_id> loads a specific trace directly

Design Notes

  • Execution model: nodes are awaited; composite nodes control child ordering and concurrency
  • Results: nodes return OK(data) or FAIL(data) and composites propagate or short-circuit
  • Blackboard: a per-run data object passed through the tree via Context

Table of Contents

Node Reference [↑]

Node Result & Status [↑]

  • Every node returns a Result with a status (OK or FAIL) and an optional data payload
  • Composite nodes typically short-circuit on FAIL (e.g., Sequence) or on OK (e.g., Selector)
  • Decorators can override or invert status while preserving or transforming data

Leaf Nodes [↑]

Function [↑]

Runs a sync/async function. Returns OK(data) for non-Result return values, or passes through a Result.

Usage:

  • Accepts 0/1/2 params: (), (blackboard), (blackboard, tracer)
  • Sync or async functions are supported
  • Returning Result bypasses wrapping; otherwise OK(value)

Supported function forms:

  • () -> Any or () -> Result
  • (blackboard) -> Any or (blackboard) -> Result
  • (blackboard, tracer) -> Any or (blackboard, tracer) -> Result
  • Async variants of all the above
tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: "ok1")
    ._().Function(lambda blackboard: "ok2")
    ._().Function(lambda blackboard, tracer: "ok3")
    .End()
)

Log [↑]

Logs a message into the trace. Always returns OK(None).

Usage:

  • msg_or_factory: string or (blackboard) -> str
  • level: trace level (default: info)
  • Emits a trace log entry and continues
tree = (
    Tree()
    .Sequence()
    ._().Log("hello step1")
    ._().Log(lambda b: f"hello, step2: job={b.job_id}", level="debug")
    .End()
)

TODO [↑]

A placeholder node that always returns OK(None).

Usage:

  • No-op leaf for scaffolding or TODO spots
  • Replace with real nodes later
tree = (
    Tree()
    .Sequence()
    ._().TODO("Prepare the Params")
    ._().TODO("Call the LLM")
    ._().Function(real_step)
    ._().TODO("Collect the result")
    .End()
)

ShowBlackboard [↑]

Returns the current blackboard in OK(b).

Usage:

  • Helpful for debugging or inspection
  • Downstream nodes can consume the returned blackboard
tree = (
    Tree()
    .ShowBlackboard()
    .End()
)

WriteBlackboard [↑]

Writes the previous node's result into the blackboard, and returns OK(data).

Usage:

  • attr_or_func: attribute name or (blackboard, data) -> None
  • Reads last_result.data; warns if no last result
  • Returns OK(data) (or OK(None) if missing)
tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: 123)
    ._().WriteBlackboard("value")
    .End()
)

or:

def _set_value(b: Blackboard, v: int) -> None:
    b.double_value = v * 2

tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: 7)
    ._().WriteBlackboard(_set_value)
    .End()
)

Assert [↑]

Checks a boolean condition and returns OK(True) or FAIL(False).

Usage:

  • Condition can be attr name or function
  • Sync/async; params 0/1/2: (), (blackboard), (blackboard, tracer)
  • AssertionError is treated as false
tree = (
    Tree()
    .Sequence()
    ._().Assert(lambda: True)
    ._().Assert("is_ready") # checks `blackboard.is_ready`
    ._().Function(run_job)
    .End()
)

Failure [↑]

Always returns FAIL(None).

Usage:

  • Useful for tests, guards, or forcing failures
tree = (
    Tree()
    .Selector()
    ._().Assert("has_cache")
    ._().Failure()
    .End()
)

Subtree [↑]

Runs another tree, optionally with a custom blackboard factory.

Usage:

  • subtree_blackboard_factory: (parent_blackboard) -> child_blackboard
  • Result is the subtree's result
sub = (
    Tree()
    .Sequence()
    ._().Function(lambda: "x")
    .End()
)

tree = (
    Tree()
    .Sequence()
    ._().Subtree(sub) # or _().Subtree(sub, lambda b: SubBlackboard(b.text))
    .End()
)

ParseJSON [↑]

Parses JSON from the last result or from a blackboard source, and returns the parsed object.

Usage:

  • src: last result (default), blackboard attr, or (blackboard) -> str
  • dst: optional blackboard attr or (blackboard, data) -> None
  • Strips common ```json fences before parsing
  • Default loader tries json_repair if installed, otherwise orjson, otherwise the standard library json
  • Recommended: install json_repair when parsing LLM-generated or otherwise non-strict JSON
tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: '{"a":1}')
    ._().ParseJSON(dst="data")
    .End()
)

If json_repair is installed, no extra code is needed for tolerant parsing:

tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: '{"a": 1')
    ._().ParseJSON(dst="data")
    .End()
)

Another example:

def set_parsed_value(b: Blackboard, d: JSON) -> None:
    b.parsed = d

tree = (
    Tree()
    .Sequence()
    ._().ParseJSON(src="raw_json", dst=set_parsed_value)
    .End()
)

LLM [↑]

Calls an LLM via openai-python and returns the output text. Supports streaming, API key factories, and OpenAI-compatible base URLs for custom LLM gateways.

Usage:

  • model / messages can be values or (blackboard) -> ... factories
  • stream: bool or (blackboard) -> bool; stream_on_delta supports sync/async callbacks
  • api_key: string or factory (blackboard) / (blackboard, model)
  • base_url: string or (blackboard) -> str | None for OpenAI-compatible providers
  • client_kwargs: explicit kwargs forwarded to AsyncOpenAI(...)
  • extra_body: explicit provider-specific request body fields merged into extra_body
  • **llm_call_kwargs: regular request kwargs forwarded to chat.completions.create(...)
  • LLMModel can optionally carry input_price_per_m / output_price_per_m in USD per 1M tokens
  • Tracer records tokens when the provider returns usage
  • Cost is taken from provider metadata when available; otherwise it falls back to token usage and the LLMModel prices
provider = LLMProvider(base_url="https://llm.example/v1", api_key="...")
model = LLMModel(
    "gpt-4.1-mini",
    provider=provider,
    extra_body={"reasoning": {"enabled": False}},
    input_price_per_m=0.15,
    output_price_per_m=0.60,
)

tree = (
    Tree()
    .Sequence()
    ._().LLM(model, [{"role": "user", "content": "hi"}])
    .End()
)

Streaming response example:

from tinytasktree import Tree

def on_delta(b, full, delta, done, reason=""):
    if delta:
        print(delta, end="")

tree = (
    Tree()
    .Sequence()
    ._().LLM(lambda b: b.model, lambda b: b.messages, stream=True, stream_on_delta=on_delta)
    .End()
)

Tool Call [↑]

LLMNode supports tool definitions out of the box: it sends Tool schemas to the model, then executes returned tool calls in order. The node still performs exactly one model call; use an outer While if the model should see tool results and answer in a later LLM call.

from tinytasktree import Context, JSON, Tool, Tree

class WeatherTool(Tool[Blackboard]):
    NAME = "get_weather"
    DESCRIPTION = "Get the current weather in a given location"
    SIGNATURES = ["get_weather(location: str) -> object"]
    EXAMPLES = ['get_weather({"location": "Beijing"})']
    SCHEMA = {"type": "object", "properties": {"location": {"type": "string"}}, "required": ["location"]}

    async def execute(self, blackboard: Blackboard, arguments: JSON, context: Context, tracer) -> JSON:
        return {"location": arguments["location"], "weather": "sunny", "temperature": 25}

tree = (
    Tree[Blackboard]("WeatherApp")
    .Sequence()
    ._().LLM(model, make_messages, tools=[WeatherTool()], on_llm_message=lambda b, message, tracer: b.messages.append(message))
    ._().WriteBlackboard("llm_record")
    .End()
)

The LLM node returns an LLMRunRecord containing final text, full messages, emitted assistant/tool messages, requested tool calls, and tool results. Streaming tool call deltas are accumulated before execution. Use on_llm_message to persist each emitted message as it is produced.

Trace chat view:

async def decide_next_step(b: Blackboard, tracer: Tracer, context: Context) -> Result:
    record = context._last_result.data
    b.done = not bool(record.tool_calls)

    # Mark the current node and the outer loop with a full chat transcript.
    tracer.update_attributes(chat_transcript=b.messages)
    context.parent_tracer(2).update_attributes(chat_transcript=b.messages)

    return Result.OK(record.final_output if b.done else None)

When chat_transcript is a list of messages, trace nodes are marked with has_chat=True; the trace UI can then show the node as a chat-capable step and render user, assistant, assistant tool call, and tool result messages together.

Composite Nodes [↑]

Sequence [↑]

Runs children in order. Returns FAIL on first failure, otherwise OK(last_child_data).

Usage:

  • Stops on first FAIL and returns last successful data
  • Empty sequence returns OK(None)
tree = (
    Tree()
    .Sequence()
    ._().Function(A)
    ._().Function(B)
    .End()
)

Selector [↑]

Runs children in order until one succeeds. Returns the first OK, else FAIL.

Usage:

  • Short-circuits on first success
  • Empty selector returns OK(None)
tree = (
    Tree()
    .Selector()
    ._().Failure()
    ._().Function(lambda: "ok")
    .End()
)

Selector is the typical choice for the fallback chain pattern:

provider = LLMProvider(base_url="https://llm.example/v1", api_key="...")
model1 = LLMModel("model1", provider=provider)
model2 = LLMModel("model2", provider=provider)
model3 = LLMModel("model3", provider=provider)

tree = (
    Tree()
    .Selector()
    ._().Timeout(20)
    ._()._().LLM(model1, llm_message)
    ._().Timeout(20)
    ._()._().LLM(model2, llm_message)
    ._().Timeout(20)
    ._()._().LLM(model3, llm_message)
    .End()
)

Parallel [↑]

Runs children concurrently. Returns OK only if all children succeed.

Usage:

  • concurrency_limit must be > 0
  • Result data is None
tree = (
    Tree()
    .Parallel(concurrency_limit=2)
    ._().Function(A)
    ._().Function(B)
    .End()
)

Gather [↑]

Runs multiple subtrees with their own blackboards and returns a list of results.

Usage:

  • params_factory: (blackboard) -> (trees, blackboards)
  • Runs each tree with its paired blackboard
  • Returns list of child data in tree order
tree = (
    Tree()
    .Gather(lambda b: (trees, blackboards))
    .End()
)

In a more detailed example:

def build_params(b):
    trees = [subtree1, subtree2]
    bbs = [BB(x=1), BB(x=2)]
    return trees, bbs

tree = (
    Tree()
    .Gather(build_params, concurrency_limit=2)
    .End()
)

RandomSelector [↑]

Randomizes the child order (optionally weighted) and returns the first OK.

Usage:

  • weights: list or (blackboard) -> list[float]
  • Weights must be positive and match child count
tree = (
    Tree()
    .RandomSelector(weights=[0.4, 0.4, 0.2]) # or weights from a factory: lambda b: b.route_weights
    ._().Function(A)
    ._().Function(B)
    ._().Function(C)
    .End()
)

If / Else [↑]

Conditional branch. If the condition is false and no else branch exists, returns OK(None).

Usage:

  • Condition supports attr name or function (sync/async)
  • 1 child (if) or 2 children (if + else)
  • Else node must be a child of If
tree = (
    Tree()
    .If(lambda b: b.flag)
    ._().Function(A)
    ._().Else()
    ._()._().Function(B)
    .End()
)

Or uses only If (without Else):

tree = (
    Tree()
    .If("is_admin")
    ._().Function(admin_flow)
    .End()
)

Decorator Nodes [↑]

ForceOk [↑]

Forces the result status to OK, optionally with a custom data factory.

Usage:

  • Optional result_factory(blackboard) -> data
  • If omitted, preserves child data
tree = (
    Tree()
    .ForceOk()
    ._().Failure()
    .End()
)

Or a ForceOk overriding the result:

tree = (
    Tree()
    .ForceOk(lambda b: {"skipped": True})
    ._().Function(best_effort)
    .End()
)

ForceFail [↑]

Forces the result status to FAIL, optionally with a custom data factory.

Usage:

  • Optional result_factory(blackboard) -> data
  • If omitted, preserves child data
tree = (
    Tree()
    .ForceFail()
    ._().Function(lambda: "x")
    .End()
)

Return [↑]

Preserves child status but replaces data with a factory result.

Usage:

  • result_factory(blackboard) -> data
  • Status is unchanged from child
tree = (
    Tree()
    .Return(lambda b: "data")
    ._().Function(A)
    .End()
)

Invert [↑]

Inverts child status while keeping data.

Usage:

  • OK becomes FAIL, FAIL becomes OK
  • Data is preserved
tree = (
    Tree()
    .Invert()
    ._().Failure()
    .End()
)

Retry [↑]

Retries a child on failure for up to max_tries with optional sleeps.

Usage:

  • sleep_secs: float or list per retry index
  • Returns first OK, else FAIL(None)
tree = (
    Tree()
    .Retry(max_tries=3, sleep_secs=0.1) # or usage: [0.1, 0.2, 0.5]
    ._().Function(A)
    .End()
)

While [↑]

Repeats child while condition is true, returns the last successful result.

Usage:

  • Condition supports attr name or function (sync/async)
  • max_loop_times guards infinite loops
tree = (
    Tree()
    .While(lambda b: b.count < 3)
    ._().Function(step)
    .End()
)

Timeout / Fallback [↑]

Runs a child with a time limit. On timeout, runs the fallback child if provided.

Usage:

  • Timeout has 1 child (main) or 2 (main + fallback)
  • On timeout, returns FAIL(None) or executes fallback
  • Fallback node must be a child of Timeout or Terminable
tree = (
    Tree()
    .Timeout(1.0)
    ._().Function(slow)
    ._().Function(on_timeout)
    .End()
)

With .Fallback():

tree = (
    Tree()
    .Timeout(2.0)
    ._().Function(main_job)
    ._().Fallback()
    ._()._().Function(fallback_job)
    .End()
)

Cacher [↑]

Caches child results in a key-value store. Optional value_validator invalidates stale cache.

Usage:

  • key_func(blackboard) -> str, required store
  • expiration: seconds, timedelta, or random (min, max)
  • value_validator: (blackboard) or (blackboard, tracer)
  • enabled: bool or (blackboard) -> bool
tree = (
    Tree()
    .Cacher(key_func=lambda b: b.key, store=store, enabled=lambda b: b.use_cache)
    ._().Function(expensive_call)
    .End()
)

Redis example:

tinytasktree itself does not depend on Redis, but redis.asyncio.Redis can be used directly as the store:

import redis.asyncio as redis

store = redis.Redis.from_url("redis://127.0.0.1:6379")

tree = (
    Tree()
    .Cacher(key_func=lambda b: f"user:{b.user_id}", store=store, expiration=60)
    ._().Function(fetch_user)
    .End()
)

With a value_validator, the cache is only considered a hit if this value matches the one stored during the cache set. This is useful for invalidating cache when dependent logic or state changes:

tree = (
    Tree()
    .Cacher(key_func=lambda b: f"user:{b.user_id}", store=store, value_validator=lambda b: b.version)
    ._().Function(fetch_user)
    .End()
)

Terminable [↑]

Runs a child while polling a store key for termination. Optionally runs a fallback.

Usage:

  • key_func(blackboard) -> signal_key, required store
  • Monitors until key exists; then cancels child
  • 1 child (main) or 2 (main + fallback)
tree = (
    Tree()
    .Terminable(lambda b: f"stop:{b.job_id}", store=store)
    ._().Function(A)
    ._().Fallback()
    ._()._().Function(B)
    .End()
)

# To trigger termination from an external script or process:
await store.set(f"stop:{job_id}", "1")

Wrapper [↑]

Wraps a child with a custom async context manager.

Usage:

  • func(child, context) -> async context manager yielding a Result
  • Useful for custom setup/teardown or instrumentation
from contextlib import asynccontextmanager

@asynccontextmanager
async def traced(child, context):
    try:
        print("before run")
        result = await child(context)
        yield result
    finally:
        print("after run")

tree = (
    Tree()
    .Wrapper(traced)
    ._().Function(run)
    .End()
)

Core APIs (Non-Node)

  • Context: runtime state (blackboard stack, trace root, path)
  • TraceRoot / TraceNode: structured trace tree
  • TraceStorageHandler / FileTraceStorageHandler: save and load traces
  • register_global_hook_after_spawned_task_finish(hook): hook for Parallel/Gather/Terminable tasks
  • run_httpserver(host, port, trace_dir) / create_http_app(...): built-in HTTP trace server

Contributing

  • Install dev dependencies: uv sync --dev
  • Build a wheel or sdist with bundled UI assets: uv build
  • Lint: uv run ruff check .
  • Test: uv run pytest

License

MIT. See LICENSE.txt.

TODO

  • Metrics Handler
  • Build Tasktree from json

About

A tiny async task-tree orchestrator library for Python, behavior-tree inspired and LLM-ready.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors