A multithreaded executor for static computation graphs. Each compute node can hold a mutable state, read inputs and produce outputs that may contain references into the node's state or into its inputs.
A diamond-shaped graph - source s, a = s + 1, and d = s + a:
use std::thread;
use flowgraph::core::Pool;
use flowgraph::typed::{Graph, GraphBuilder, Port, Segment, Source};
/// A stateless increment operator.
struct Inc;
impl Segment for Inc {
type Inputs = Port<i64>;
type Outputs = Port<i64>;
type State = ();
fn init(self) {}
fn compute<'a, 'b: 'a>((_, x): (bool, i64), _: &'b mut (), _: bool) -> (bool, i64) {
(true, x + 1)
}
}
/// A stateless adder with two inputs.
struct Add;
impl Segment for Add {
type Inputs = (Port<i64>, Port<i64>);
type Outputs = Port<i64>;
type State = ();
fn init(self) {}
fn compute<'a, 'b: 'a>(
((_, a), (_, b)): ((bool, i64), (bool, i64)),
_: &'b mut (),
_: bool,
) -> (bool, i64) {
(true, a + b)
}
}
fn main() {
let mut b = GraphBuilder::new();
let s = b.push_source(Source::new(1));
let a = b.push(Inc, *s);
let d = b.push(Add, (*s, a));
let mut g = Graph::from_builder(b);
let mut pool = Pool::new(thread::available_parallelism().unwrap().get());
assert_eq!(g.view(*s), 1);
assert_eq!(g.view(a), 2);
assert_eq!(g.view(d), 3);
// Update the source value and recompute.
*g.state_mut(s) = 5;
g.stabilize(&mut pool);
assert_eq!(g.view(a), 6);
assert_eq!(g.view(d), 11);
}Key calls:
GraphBuilder::push_source(src)pushes an input-less source segment and hands back aSourceHandle— the only handle you can poke. UseSource::new(v)for a simple scalar (carried by value) orRefSource::new(v)for a by-reference source (e.g. a non-CopyVec). TheSourceHandlederefs to its output handle, so wire and read it as*handle.GraphBuilder::push(segment, input_handles) -> output_handleswires a node. The input-handle shape matches the segment'sInputstree ((), a singleHandle, a tuple); the returned output handles matchOutputs.Graph::from_builder(b)freezes the topology and runs every node's build call.g.view(handle) -> Treads aViewPortoutput by value;g.ref_view(handle) -> &Treads aRefViewPortoutput by reference (both only between stabilizations).g.state_mut(source_handle) -> &mut Thands you a source's state and marks it dirty for the nextstabilize.g.stabilize(&mut pool)runs one generation.
Each unit of scheduling is a Segment: it has typed inputs and outputs,
a mutable state, and a compute function.
use flowgraph::typed::*;
pub trait Segment {
type Inputs: Interface;
type Outputs: Interface;
type State: Send + 'static;
fn init(self) -> Self::State;
fn compute<'a, 'b: 'a>(
inputs: <Self::Inputs as Interface>::Values<'a>,
state: &'b mut Self::State,
init: bool,
) -> <Self::Outputs as Interface>::Values<'a>;
}init(self) -> Stateallocates the node's state (and its output storage).compute(inputs, &mut state, init)runs whenever the node is in the dirty cone. It fillsstateand returns its output payload tree — references into its own state, references forwarded from its inputs, or by-value views. Theinitflag istrueexactly once, on the build call that the graph builder performs to discover the output shape. Use it to seed state or size buffers.
Payloads are Copy borrows, not owned values — this is what makes edges
zero-copy. The lifetime bound 'b: 'a lets you return either &'a references
into the inputs, or &'a references into &'b mut state.
Inputs / Outputs are an interface tree built from leaves and tuples.
Each leaf is generic over a ValueView V (how a value rides
the wire); the aliases Port<T> / RefPort<T> / RefPorts<T> name the common
scalar case (= …<Scalar<T>>):
ViewPort<V>— a single by-value view leaf (see Value ports). Payload:(bool, V::View<'a>).Port<T>(= ViewPort<Scalar<T>>).RefViewPort<V>— a single by-reference leaf. Payload:(bool, &V::View<'a>).RefPort<T>(= RefViewPort<Scalar<T>>) is the plain&Tport.RefViewPorts<V>— a runtime-length group ofRefViewPorts (a notify plane and a value plane, structure-of-arrays). Payload:(&[bool], &[&V::View<'a>]).RefPorts<T>(= RefViewPorts<Scalar<T>>).()and tuples(A, B, …)up to arity 12 — compound branches.
For runtime-length groups, the length must remain fixed after the first call, so that we have a well-defined static computation graph. Violations will be caught and panic at runtime.
Producing a RefPorts output requires creating an array of references with
lifetime 'a matching the inputs. This allows for simple forwarding of input
references, but it also creates difficulty for a node that computes its own
values: the references cannot be local variables, but cannot be stored in the
state either (state is &'b mut which is invariant over 'b: if you store
'a references in the state, they may become dangling on the next generation).
To address this difficulty, the library provides Arena: a per-node bump
arena which can be stored in node states. However, allocated objects are never
dropped - they are simply overwritten in place on the next generation.
Implementing proper dropping would introduce overhead and complexity.
Each port carries a bool flag alongside the value or reference, indicating
whether the value is a notification. At each generation, a source node whose
value is not modified will have the flag set to false (no notify), and a
compute node can choose to set it to false to communicate intention to skip
downstream processing.
use flowgraph::typed::*;
struct Abs;
impl Segment for Abs {
type Inputs = Port<i64>;
type Outputs = Port<i64>;
type State = i64; // Remembers the previous output
fn init(self) -> i64 { 0 }
fn compute<'a, 'b: 'a>((_, x): (bool, i64), state: &'b mut i64, _: bool) -> (bool, i64) {
let new = x.abs();
let changed = new != *state; // Notify only when |x| actually changed
*state = new;
(changed, new)
}
}The Operator trait is a thin wrapper over Segment: it splits the compute
function into two paths, an actual processing path and a passthrough path,
and chooses the latter when all input notify flags indicate no change.
The graph executor may also choose to skip some nodes completely if no upstream source node is modified.
Multi-threaded execution has a cost: every node is a thread-pool task, and waking a worker to run a trivial node can take much longer time than the actual computation inside the node itself. Therefore, it is desirable to fuse tiny segments together into a single node. (Fusing heavy nodes is equally possible, but doing so prevents parallel execution.)
The library provides combinator methods to fuse segments into larger segments:
| Combinator | Method | Meaning |
|---|---|---|
Id<T> |
— | Identity: outputs are inputs unchanged. |
Comp(f, g) |
f.then(g) |
Composition: outputs g(f(x)). |
Left<T, U> / Right<T, U> |
— | Projection: outputs the first / second element of a pair. |
Fork(f, g) |
f.fork(g) |
Fan-out: feed the same input to both, then pair outputs. |
Par(f, g) |
f.par(g) |
Parallel composition: run f, g on a pair of inputs, then pair outputs. Equivalent to Fork(Comp(Left, f), Comp(Right, g)). |
Arr::new(|values, _| …) |
— | Applies a stateful closure to the inputs. |
However, point-free combinators get unreadable fast. As an alternative, the
library also provides the segment! macro, which is a DSL that compiles down
to combinators like the Arrow notation in Haskell1:
use flowgraph::typed::*;
use flowgraph::segment;
// A stateless adder with two inputs.
struct Add;
impl Segment for Add {
type Inputs = (Port<i64>, Port<i64>);
type Outputs = Port<i64>;
type State = ();
fn init(self) {}
fn compute<'a, 'b: 'a>(
((_, a), (_, b)): ((bool, i64), (bool, i64)),
_: &'b mut (),
_: bool,
) -> (bool, i64) {
(true, a + b)
}
}
let mut b = GraphBuilder::new();
let s = b.push_source(Source::new(1));
let t = b.push_source(Source::new(2));
// One fused node computing: c = b + a; d = c + a; e = d + a; result (e, d, c).
// Type annotation is needed on inputs and outputs.
let seg = segment!(|a: Port<i64>, b: Port<i64>| -> (Port<i64>, Port<i64>, Port<i64>) {
let c = (b, a) => Add;
let d = (c, a) => Add;
let e = (d, a) => Add;
(e, d, c)
});
let (e, d, c) = b.push(seg, (*s, *t));The library uses unsafe internally, but the typed API is safe to use.
For more details, the following runtime invariants are maintained internally:
- Single writer per slot. Each output slot has exactly one producing node;
each input slot is scattered into by exactly that one producer. Concurrent
computes write disjoint slots. - Per-generation lifetime. Output pointers stay valid as long as inputs and
state are unchanged;
passthrough's&Stateforbids reallocation, keeping out-of-cone consumers' pointers live across generations. - Read guard. Reading a slot after poking a source but before
stabilizepanics rather than dereferencing a possibly-stale forwarded pointer. - Poison on panic. If a
computepanics, downstream slots may hold dangling forwarded pointers, so the graph is poisoned: the pool catches the panic, settles the batch (no hang), re-raises out ofstabilize, and every laterstabilizeor slot read panics. There is no recovery — treat the graph as dead. For recoverable failures, make the failure a value (e.g. aResult<T, E>cell) instead of panicking.
Tests have been run on Miri to check for memory safety and common UBs.