Skip to content

bridgekat/flowgraph

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Flowgraph

Test

A multithreaded executor for static computation graphs. Each compute node can hold a mutable state, read inputs and produce outputs that may contain references into the node's state or into its inputs.

Examples

A diamond-shaped graph - source s, a = s + 1, and d = s + a:

use std::thread;
use flowgraph::core::Pool;
use flowgraph::typed::{Graph, GraphBuilder, Port, Segment, Source};

/// A stateless increment operator.
struct Inc;

impl Segment for Inc {
    type Inputs = Port<i64>;
    type Outputs = Port<i64>;
    type State = ();

    fn init(self) {}

    fn compute<'a, 'b: 'a>((_, x): (bool, i64), _: &'b mut (), _: bool) -> (bool, i64) {
        (true, x + 1)
    }
}

/// A stateless adder with two inputs.
struct Add;

impl Segment for Add {
    type Inputs = (Port<i64>, Port<i64>);
    type Outputs = Port<i64>;
    type State = ();

    fn init(self) {}

    fn compute<'a, 'b: 'a>(
        ((_, a), (_, b)): ((bool, i64), (bool, i64)),
        _: &'b mut (),
        _: bool,
    ) -> (bool, i64) {
        (true, a + b)
    }
}

fn main() {
    let mut b = GraphBuilder::new();
    let s = b.push_source(Source::new(1));
    let a = b.push(Inc, *s);
    let d = b.push(Add, (*s, a));
    let mut g = Graph::from_builder(b);
    let mut pool = Pool::new(thread::available_parallelism().unwrap().get());

    assert_eq!(g.view(*s), 1);
    assert_eq!(g.view(a), 2);
    assert_eq!(g.view(d), 3);

    // Update the source value and recompute.
    *g.state_mut(s) = 5;
    g.stabilize(&mut pool);

    assert_eq!(g.view(a), 6);
    assert_eq!(g.view(d), 11);
}

Key calls:

  • GraphBuilder::push_source(src) pushes an input-less source segment and hands back a SourceHandle — the only handle you can poke. Use Source::new(v) for a simple scalar (carried by value) or RefSource::new(v) for a by-reference source (e.g. a non-Copy Vec). The SourceHandle derefs to its output handle, so wire and read it as *handle.
  • GraphBuilder::push(segment, input_handles) -> output_handles wires a node. The input-handle shape matches the segment's Inputs tree ((), a single Handle, a tuple); the returned output handles match Outputs.
  • Graph::from_builder(b) freezes the topology and runs every node's build call.
  • g.view(handle) -> T reads a ViewPort output by value; g.ref_view(handle) -> &T reads a RefViewPort output by reference (both only between stabilizations).
  • g.state_mut(source_handle) -> &mut T hands you a source's state and marks it dirty for the next stabilize.
  • g.stabilize(&mut pool) runs one generation.

Concepts

Segments

Each unit of scheduling is a Segment: it has typed inputs and outputs, a mutable state, and a compute function.

use flowgraph::typed::*;

pub trait Segment {
    type Inputs: Interface;
    type Outputs: Interface;
    type State: Send + 'static;

    fn init(self) -> Self::State;

    fn compute<'a, 'b: 'a>(
        inputs: <Self::Inputs as Interface>::Values<'a>,
        state: &'b mut Self::State,
        init: bool,
    ) -> <Self::Outputs as Interface>::Values<'a>;
}
  • init(self) -> State allocates the node's state (and its output storage).
  • compute(inputs, &mut state, init) runs whenever the node is in the dirty cone. It fills state and returns its output payload tree — references into its own state, references forwarded from its inputs, or by-value views. The init flag is true exactly once, on the build call that the graph builder performs to discover the output shape. Use it to seed state or size buffers.

Payloads are Copy borrows, not owned values — this is what makes edges zero-copy. The lifetime bound 'b: 'a lets you return either &'a references into the inputs, or &'a references into &'b mut state.

Interfaces

Inputs / Outputs are an interface tree built from leaves and tuples. Each leaf is generic over a ValueView V (how a value rides the wire); the aliases Port<T> / RefPort<T> / RefPorts<T> name the common scalar case (= …<Scalar<T>>):

  • ViewPort<V> — a single by-value view leaf (see Value ports). Payload: (bool, V::View<'a>). Port<T> (= ViewPort<Scalar<T>>).
  • RefViewPort<V> — a single by-reference leaf. Payload: (bool, &V::View<'a>). RefPort<T> (= RefViewPort<Scalar<T>>) is the plain &T port.
  • RefViewPorts<V> — a runtime-length group of RefViewPorts (a notify plane and a value plane, structure-of-arrays). Payload: (&[bool], &[&V::View<'a>]). RefPorts<T> (= RefViewPorts<Scalar<T>>).
  • () and tuples (A, B, …) up to arity 12 — compound branches.

For runtime-length groups, the length must remain fixed after the first call, so that we have a well-defined static computation graph. Violations will be caught and panic at runtime.

Producing a RefPorts output requires creating an array of references with lifetime 'a matching the inputs. This allows for simple forwarding of input references, but it also creates difficulty for a node that computes its own values: the references cannot be local variables, but cannot be stored in the state either (state is &'b mut which is invariant over 'b: if you store 'a references in the state, they may become dangling on the next generation).

To address this difficulty, the library provides Arena: a per-node bump arena which can be stored in node states. However, allocated objects are never dropped - they are simply overwritten in place on the next generation. Implementing proper dropping would introduce overhead and complexity.

Notifications

Each port carries a bool flag alongside the value or reference, indicating whether the value is a notification. At each generation, a source node whose value is not modified will have the flag set to false (no notify), and a compute node can choose to set it to false to communicate intention to skip downstream processing.

use flowgraph::typed::*;

struct Abs;

impl Segment for Abs {
    type Inputs = Port<i64>;
    type Outputs = Port<i64>;
    type State = i64; // Remembers the previous output

    fn init(self) -> i64 { 0 }

    fn compute<'a, 'b: 'a>((_, x): (bool, i64), state: &'b mut i64, _: bool) -> (bool, i64) {
        let new = x.abs();
        let changed = new != *state; // Notify only when |x| actually changed
        *state = new;
        (changed, new)
    }
}

The Operator trait is a thin wrapper over Segment: it splits the compute function into two paths, an actual processing path and a passthrough path, and chooses the latter when all input notify flags indicate no change.

The graph executor may also choose to skip some nodes completely if no upstream source node is modified.

Segment fusion

Multi-threaded execution has a cost: every node is a thread-pool task, and waking a worker to run a trivial node can take much longer time than the actual computation inside the node itself. Therefore, it is desirable to fuse tiny segments together into a single node. (Fusing heavy nodes is equally possible, but doing so prevents parallel execution.)

The library provides combinator methods to fuse segments into larger segments:

Combinator Method Meaning
Id<T> Identity: outputs are inputs unchanged.
Comp(f, g) f.then(g) Composition: outputs g(f(x)).
Left<T, U> / Right<T, U> Projection: outputs the first / second element of a pair.
Fork(f, g) f.fork(g) Fan-out: feed the same input to both, then pair outputs.
Par(f, g) f.par(g) Parallel composition: run f, g on a pair of inputs, then pair outputs. Equivalent to Fork(Comp(Left, f), Comp(Right, g)).
Arr::new(|values, _| …) Applies a stateful closure to the inputs.

However, point-free combinators get unreadable fast. As an alternative, the library also provides the segment! macro, which is a DSL that compiles down to combinators like the Arrow notation in Haskell1:

use flowgraph::typed::*;
use flowgraph::segment;

// A stateless adder with two inputs.
struct Add;

impl Segment for Add {
    type Inputs = (Port<i64>, Port<i64>);
    type Outputs = Port<i64>;
    type State = ();

    fn init(self) {}

    fn compute<'a, 'b: 'a>(
        ((_, a), (_, b)): ((bool, i64), (bool, i64)),
        _: &'b mut (),
        _: bool,
    ) -> (bool, i64) {
        (true, a + b)
    }
}

let mut b = GraphBuilder::new();
let s = b.push_source(Source::new(1));
let t = b.push_source(Source::new(2));

// One fused node computing: c = b + a; d = c + a; e = d + a; result (e, d, c).
// Type annotation is needed on inputs and outputs.
let seg = segment!(|a: Port<i64>, b: Port<i64>| -> (Port<i64>, Port<i64>, Port<i64>) {
    let c = (b, a) => Add;
    let d = (c, a) => Add;
    let e = (d, a) => Add;
    (e, d, c)
});

let (e, d, c) = b.push(seg, (*s, *t));

Safety

The library uses unsafe internally, but the typed API is safe to use. For more details, the following runtime invariants are maintained internally:

  • Single writer per slot. Each output slot has exactly one producing node; each input slot is scattered into by exactly that one producer. Concurrent computes write disjoint slots.
  • Per-generation lifetime. Output pointers stay valid as long as inputs and state are unchanged; passthrough's &State forbids reallocation, keeping out-of-cone consumers' pointers live across generations.
  • Read guard. Reading a slot after poking a source but before stabilize panics rather than dereferencing a possibly-stale forwarded pointer.
  • Poison on panic. If a compute panics, downstream slots may hold dangling forwarded pointers, so the graph is poisoned: the pool catches the panic, settles the batch (no hang), re-raises out of stabilize, and every later stabilize or slot read panics. There is no recovery — treat the graph as dead. For recoverable failures, make the failure a value (e.g. a Result<T, E> cell) instead of panicking.

Tests have been run on Miri to check for memory safety and common UBs.

Footnotes

  1. https://www.haskell.org/arrows/syntax.html

About

A multithreaded executor for static computation graphs.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages