-
Notifications
You must be signed in to change notification settings - Fork 127
Components
Components are basic building blocks in flow-based programming. A component is a single entity with a single purpose that processes incoming data and passes the results down the stream.
In GoFlow a component is a struct that contains at least one input port (bidirectional or receive-only channel) and flow.Component prototype:
type SimpleComponent struct {
flow.Component
In <-chan PacketType
}
flow.Component is required to attach components to graphs.
Ports are windows to the outer world for a component. In GoFlow component ports are channel fields in component's struct. There are 2 types of ports: input ports (inports) and output ports (outports). The following example demonstrates how they can be declared:
type PortExample struct {
flow.Component
Foo <-chan int // receive-only inport
Bar chan<- int // send-only outport
Boo chan string // can be inport or outport
}
Ports must be exported fields (their names must start with capital latter), otherwise GoFlow won't be able to detect them.
There are no specific limitations on data types of port channels. This means that you can also pass channels through the ports. If you need to pass more than one data elements to a component at the same time, it is recommended to pass tuples and structures instead of trying to pass them through multiple ports. Multiple inports should be used for information packets which arrive from different sources or at different time. The number of ports per component is not limited, but it is recommended to keep them a few, otherwise component's cohesion and logical consistency suffers.
Behavior of a component is driven by events. In GoFlow world an event is when something happens to one of the ports. Currently the following events are supported:
- Incoming packet received: there is a new information packet (data element) received on an input port.
- Connection closed: the channel associated with inport has been closed by sender or network.
Early implementations of GoFlow also supported another event on outports - outgoing packet sent - and it sent outgoing packets in non-blocking mode, but it was stripped out because it isn't really necessary and it increases program complexity.
Events are independent, except for the fact that no packets can be received on an already closed port. Events on the same port are independent, as well as the events on different ports.
Components handle events on their ports using methods. Handler methods are named after ports and events they handle. Depending on type of the event, there are 2 possible method signatures:
// Incoming packet handler
func (c *ComponentType) OnPortName(argName ChanElementType)
// Channel close event handler
func (c *ComponentType) OnPortNameClose()
Where PortName is the name of the port field in ComponentType struct and ChanElementType is the type of the port elements.
Handler functions don't return any values. If they need to yield some results they should send them through outports instead.
Events in GoFlow are asynchronous and handlers are called asynchronously, each handler call in its own goroutine. It means that if 2 packets arrive at inports at the same time, the system will try to handle them concurrently. It also means that if event B happens after event A for a process C, it doesn't mean that C will finish processing them in the same order.
The fact that events on different ports of the same component are independent means that if you need 2 or more inports to be synchronized, you need to provide synchronization yourself (for example, using queues).
Components are reusable classes, they don't exist at run time. The actual job is done by their instances called Processes. There can be many processes of the same component like there can be many objects of the same class.
Processes usually are created using Go's new() function:
proc := new(ComponentType)
If a component has some fields that needs more complex construction, e.g. pointers which must be allocated, you should provide a constructor function instead:
func NewCustomType() *CustomType {
p := new(CustomType)
p.ptr = new(SomePointer)
p.flag = 123
return p
}
Notice that processes are always referenced and passed by pointers. This is a requirement.
Standalone processes (not attached to a network) are started explicitly with a non-blocking flow.RunProc() call:
flow.RunProc(p)
Normally processes are contained in networks, so networks start their processes themselves and you don't need to call flow.RunProc(). You should only run top-level network for them.
While starting a process will invoke its Init() method if the component implements flow.Initializable interface. So, if you need to run some code right before a process starts listening on its inports, provide such a function:
func (c *ComponentType) Init() {
// Your initialization code here
}
Initialization handler is not required and is omitted for most components.
In GoFlow processes are persistent and they continue running until all of their input ports are closed. The same is true for networks. Thus you should close all inports of a process or network to shut it down.
Once all inports of a process have been closed, it stops handling incoming packets and proceeds to its finalization routine. Similarly to initialization, it will invoke component's Finish() handler if the component implements flow.Finalizable interface:
func (c *ComponentType) Finish() {
// Your finalization code here
}
After that it will automatically close all of it outports. This way flow-based networks shut down automatically down the data stream.
If you need to override this behavior and avoid automatic closing of outports, you can define a custom Shutdown() handler for a component:
func (c *ComponentType) Shutdown() {
// Close ports yourself when necessary
}
It is assumed that component fields other than ports describe its state. They can be of any type, either exported or private, GoFlow doesn't care about it. The following example demonstrates a component with several state fields:
type StateExample struct {
flow.Component
// ports
In1 <-chan Envelope // inport
In2 <-chan Stamp // inport
Out chan<- Letter // outport
// state
counter int // private field
buffer [32]Stamp // private field
Status string // public field
}
In GoFlow Event handlers are invoked asynchronously by default and this may lead to race conditions and unexpected program behavior if they modify state fields of the process or access external resources such as disk, console or network.
GoFlow provides a built-in mechanism to make state modification thread-safe for asynchronous processes. If a component contains StateLock field of *sync.Mutex type, GoFlow will automatically lock it before an event handler is called and unlock it after event handler returns. Let's add a StateLock to the previous example:
import "sync"
type StateExample struct {
flow.Component
// ports
In1 <-chan Envelope // inport
In2 <-chan Stamp // inport
Out chan<- Letter // outport
// lock
StateLock *sync.Mutex
// state
counter int // private field
buffer [32]Stamp // private field
Status string // public field
}
func (p *StateExample) OnIn2(s Stamp) {
// We can safely modify fields here
p.buffer = append(p.buffer, s)
p.counter++
}
Instead of adding a mutex you could stop multiple goroutines from modifying internal state of the process or external resources by switching the process into Synchronous mode. In Synchronous mode a process runs in a single goroutine and when a new input arrives the process finishes handling previous one before starting to handle a new input.
To switch a single component instance into Synchronous mode you need to modify Mode property of contained flow.Component:
printer := new(Printer)
// Switching the printer to Sync mode so it doesn't race for the output stream
printer.Component.Mode = flow.ComponentModeSync
You can also switch the entire network into Synchronous mode by modifying the DefaultComponentMode
variable before running the network:
net := NewApp()
// Switching to Sync by default
flow.DefaultComponentMode = flow.ComponentModeSync
// Running the net and all of its processes in Synchronous mode
flow.RunNet(net)