-
Notifications
You must be signed in to change notification settings - Fork 127
Components
Components are basic building blocks in flow-based programming. A component is a single entity with a single purpose that processes incoming data and passes the results down the stream.
In GoFlow a component is a struct that contains at least one input port (bidirectional or receive-only channel):
type SimpleComponent struct {
In <-chan PacketType
}
Can a component have no input ports? In theory yes, but it's not very practical, because in this case a network has no way to signal the process that it has to stop.
Ports are windows to the outer world for a component. In GoFlow component ports are channel fields in component's struct. There are 2 types of ports: input ports (inports) and output ports (outports). The following example demonstrates how they can be declared:
type PortExample struct {
Foo <-chan int // receive-only inport
Bar chan<- int // send-only outport
Boo chan string // can be inport or outport
}
Ports must be exported fields (their names must start with capital latter), otherwise GoFlow won't be able to detect them.
There are no specific limitations on data types of port channels. This means that you can also pass channels through the ports. If you need to pass more than one data elements to a component at the same time, it is recommended to pass tuples and structures instead of trying to pass them through multiple ports. Multiple inports should be used for information packets which arrive from different sources or at different time. The number of ports per component is not limited, but it is recommended to keep them a few, otherwise component's cohesion and logical consistency suffers.
Behaviour of a component is driven by events. In GoFlow world an event is when something happens to one of the ports. Currently the following events are supported:
- Incoming packet received: there is a new information packet (data element) received on an input port.
- Connection closed: the channel associated with inport has been closed by sender or network.
Early implementations of GoFlow also supported another event on outports - outgoing packet sent - and it sent outgoing packets in non-blocking mode, but it was stripped out because it isn't really necessary and it increases program complexity.
Events are independent, except for the fact that no packets can be received on an already closed port. Events on the same port are independent, as well as the events on different ports.
Components are reusable classes, they don't exist at run time. The actual job is done by their instances called Processes. There can be many processes of the same component like there can be many objects of the same class.
Process also is how the component behaves.
Behaviour of a Component is defined by its Process function. Consider an example:
type Echo struct {
In <-chan int
Out chan<- int
}
func (c *Echo) Process() {
for i := range c.In {
c.Out <- i
}
}
In this example we have a simple Echo
component. Its Process
function reads incoming packets and sends them to the output port.
Process functions don't return any values. If they need to yield some results they should send them through outports instead.
The order of data processing depends on the logic that is implemented in the process function. By default it's a synchronous reader/writer, but you can use goroutines to implement parallel processing of packets within the same process.
The fact that events on different ports of the same component are independent means that if you need 2 or more inports to be synchronized, you mat need to provide synchronization yourself (for example, using queues).
The example below is a repeater component that reads two inputs: one tells which word to repeat, another one is the number of times the word needs to be repeated:
type Repeater struct {
Word <-chan string
Times <-chan int
Words chan<- string
}
func (c *Repeater) Process() {
// Input guard is used to check if inputs on all required ports have arrived
guard := goflow.NewInputGuard("word", "times")
times := 0
word := ""
// The main select-loop
for {
// select listens on all available inports
select {
case t, ok := <-c.Times:
if ok {
// Received data
times = t
// Calling the function that contains the actual logic
c.repeat(word, times)
} else if guard.Complete("times") {
// ok is false, meaning that the port is closed
// guard.Complete() just flags this port as completed
return
}
case w, ok := <-c.Word:
if ok {
word = w
c.repeat(word, times)
} else if guard.Complete("word") {
return
}
}
}
}
// repeat contains the actual logic that uses provided inputs
func (c *Repeater) repeat(word string, times int) {
if word == "" || times <= 0 {
return
}
for i := 0; i < times; i++ {
c.Words <- word
}
}
Processes usually are created using Go's new() function:
proc := new(ComponentType)
If a component has some fields that needs more complex construction, e.g. pointers which must be allocated, you should provide a constructor function instead:
func NewCustomType() *CustomType {
p := new(CustomType)
p.ptr = new(SomePointer)
p.flag = 123
return p
}
Notice that processes are always referenced and passed by pointers. This is a requirement.
In v0
branch of GoFlow there were special callbacks for lifecycle management. Starting of v1
all initialization and cleanup can be added inside of the Process
function itself, just before or after the main loop that handles input/output.
It is assumed that component fields other than ports describe its state. They can be of any type, either exported or private, GoFlow doesn't care about it. The following example demonstrates a component with several state fields:
type StateExample struct {
// ports
In1 <-chan Envelope // inport
In2 <-chan Stamp // inport
Out chan<- Letter // outport
// state
counter int // private field
buffer [32]Stamp // private field
Status string // public field
}
If a process in the above example accesses the state fields concurrently (e.g. by spawning some goroutines), you need to protect that state data from race conditions.
A typical way to do that is by using a *sync.Mutex
from the sync package of the standard library.