Implementations

Simplest Data Pipeline

Create an array of closures.

Create a runner function which will loop over the function pointers and then feed the result of first to the second, the second to the third and so on ….

This creates a very simple linear pipeline.

Pure Data

Reactive implementation.

npm install --save eventemitter2

A normal node has default ports to help with most common use cases, like pipes and stateful nodes.

In SessionNode the Node is executed when the all the data is available, so the order of arrival of data at each individual port can be asynchronous. This is also an excellent point to implement promise-resolution like behaviour for real concurrency. Useful to maintain complex state. Single User vs Multiple Users.

You can use private state in Node to write complex if / else logic to simulate co-routine like concurrency.

You can also extend to the connection class to implement typechecking related code. This model is used in Kelp.

constEventEmitter = require('eventemitter2');

function Message(data) {
    return {...data};
}

function Connection(source, outlet, sink, inlet) {
    var self = {};
    self.source = source;
    self.outlet = outlet;
    self.sink = sink;
    self.inlet = inlet;
    self.transmit = function(message) {
        sink.process(inlet, message);
    }
    return self;
}

function Node(id, graph) {
    var self = {}
    self.id = id;
    self.graph = graph;
    self.inlets = {
        "input": [],
        "control": []
    };
    self.outlets = {
        "output": [],
        "log": [],
        "debug": [],
        "error": []
    };

    self.transmit = function (outlet, message) {
        self.outlets[outlet].map(function (connection) {
            connection.transmit(Message(message));
        })
    }

    self.process = function (inlet, message) {
    }

    self.init = function () {
    }

    self.start = function () {
    }

    self.stop = function () {
    }

    return self;

}

function SessionNode(id, graph) {
    var node = Node(id, graph);
    var sesssionStash = {};

    node.process = function (inlet, message) {
        if (message.session) {
            if (sesssionStash[message.session]) {
                sesssionStash[message.session].push([inlet, message]);
            } else {
                sesssionStash[message.session] = [[inlet, message]];
            }
        }

        totalInlets = Object.Keys(node.inlets).length;
        if (totalInlets == sesssionStash[message.session].length) {
            node.processAll(sesssionStash[message.session]);
        }

    }

    node.processAll = function (messages)  {
    }

}



function Component(id, graph) {
    var self = {}
    self.id = id;
    self.graph = graph;

    self.emit = function (message) {
        self.graph.emitter.emit("component:" + self.id, Message(message));
    }

    self.process = function (message) {
    }

    self.init = function () {
    }

    self.start = function () {
    }

    self.stop = function () {
    }

    return self;

}

function Graph() {
    var self = {};
    self.emitter = new EventEmitter();
    self.gensym = 0;
    self.nodes = {
    }
    self.components = {
    }
    self.createSession = function() {
        return 'gensym:' + (self.gensym++);
    }
    self.context = {};
    self.save = function () {
        // to json
    }
    self.load = function () {
        // from json
    }
    self.addNode = function (id, nodeFunction, args) {
        const node = nodeFunction(id, self);
        node.init.apply(null, args);
        self.nodes[id] = node;
    }
    self.removeNode = function(id) {
        const node = self.node[id];
        node.stop();
        delete self.node[id];
    }
    self.registerComponent = function(id, channel, componentFunction, args) {
        const component = componentFunction(id, self);
        self.components[id] = component;
        component.init.apply(null, args);
        self.emitter.on(channel, function () {
            self.components[id].process.apply(null, arguments);
        })
    }
    self.unregisterComponent = function (id) {
        const component = self.components[id];
        component.stop();
        delete self.components[id];
    }
    self.connect = function (source, outlet, sink, inlet) {
        self.nodes[source].outlets[outlet].push(Connection(self.nodes[source], outlet, self.nodes[sink], inlet));
    }
    self.startAll = function() {
        for (const [key, value] of Object.entries(self.nodes)) {
            value.start();
        }
        for (const [key, value] of Object.entries(self.components)) {
            value.start();
        }
    }
    self.stopAll = function() {
        for (const [key, value] of Object.entries(self.nodes)) {
            value.stop();
        }
        for (const [key, value] of Object.entries(self.components)) {
            value.stop();
        }
    }
    return self;
}

Example Code

function Adder(id, graph) {
    var node = Node(id, graph);
    node.process = function (inlet, messsage) {
        node.transmit("output", {"data": 5 + messsage.data});
    }
    return node;
}

function Dumper(id, graph) {
    var base = Node(id, graph);
    base.process = function (inlet, message) {
        base.graph.emitter.emit("tracing", Message(message));
    }
    return base;
}

function Tracer(id, graph) {
    var base = Component(id, graph);
    base.process = function (message) {
        console.log("Tracing: " + message.data);
    }
    return base;
}

var g = Graph();
g.addNode("main", Adder);
g.addNode("dumper", Dumper);
g.registerComponent("tracer", "tracing", Tracer);
g.connect("main", "output", "dumper", "input");
g.nodes["main"].process("foo", Message({"data": 10}))

Firing Rules

https://github.com/noflo/noflo/blob/master/src/lib/Helpers.coffee

FBP

FBP is very generic and allows for Asynchronous, out of order arrival at the input ports.

It can be implemented by

  • Co-routines

  • Fibers

  • Threads

  • Async / Await

The main engine implemented by Paul Morrison uses wait/notify pattern with threads.

See https://github.com/jpaulm/javafbp/blob/master/src/main/java/com/jpaulmorrison/fbp/core/engine/Connection.java

Inside the Node, every time port.read() called it changes it’s state from Running to Waiting.

Rough Logic,

Is the Node in Running State ?
     If so, then push data into a waiting Queue
Is the Node in Ready State ?
    If so then push the data into the Node.
Is the Node in Waiting State ?
   What is the port that the Node is waiting for ?
        Is the New Data for the waiting port ?
            If so, then push data into the Node and change the Node to Running State.
         Is the New Data not for the waiting port ?
            If so, then push data into a waiting Queue

Sample go,

package main;

import (
    "fmt"
    "sync"
)

type ISystem interface {
    Process()
    Connect(input chan interface{}, output chan interface{}) 
    Transmit()
}

type System struct{
    Connections []func()
}

func (b *System) Connect(input chan interface{}, output chan interface{}) {
    b.Connections = append(b.Connections , func () {
        val := <- input
        output <- val
    })

}

func (b *System) Transmit()  {
    conns := b.Connections;

    for _,conn := range conns {
        go conn()
    }
}


type AdderSystem struct {
    System
    in1 chan interface{}
    in2 chan interface{}
    out chan interface{}
}

type PrinterSystem struct {
    System
    in1 chan interface{}
}


func (b AdderSystem) Process() {
    for {
        val1 := <-b.in1;
        val2 := <-b.in2;
        b.out  <- (val1.(int) + val2.(int));
    }
}



func (b PrinterSystem) Process() {
    for {
        val := <-b.in1
        fmt.Println(val)
    }
}

func Run(systems []ISystem) sync.WaitGroup {
    var wg sync.WaitGroup
    wg.Add(len(systems))
    for i:=0; i < len(systems); i++ {
        sys := systems[i]
        go func() {
            sys.Process();
            wg.Done();
        }()
    }
    for _, sys := range systems {
        sys.Transmit()
    }

    return wg
}

func main() {
    as := AdderSystem{in1: make(chan interface{}), in2: make(chan interface{}), out: make(chan interface{})}
    p := PrinterSystem{in1: make(chan interface{})}
    as.Connect(as.out, p.in1)
    systems := []ISystem{&as, &p}
    wg := Run(systems)
    as.in1 <- 1;
    as.in2 <- 2;
    wg = Run(systems)
    wg.Wait()

    as.in1 <- 4;
    as.in2 <- 2;
    wg.Wait()
}