Skip to content

Commit

Permalink
Merge pull request #69 from DrmagicE/federation-dev
Browse files Browse the repository at this point in the history
docs(federation): add federation doc
  • Loading branch information
DrmagicE authored Feb 12, 2021
2 parents 1daeb38 + 0aebbda commit 0179b7a
Show file tree
Hide file tree
Showing 10 changed files with 498 additions and 67 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/go-sockaddr v1.0.0
github.com/hashicorp/logutils v1.0.0
github.com/hashicorp/serf v0.9.5
github.com/iancoleman/strcase v0.1.2
Expand Down
213 changes: 213 additions & 0 deletions plugin/federation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# Federation

Federation is a kind of clustering mechanism which provides high-availability and horizontal scaling.
In Federation mode, multiple gmqtt brokers can be grouped together and "act as one".
However, it is impossible to fulfill all requirements in MQTT specification in a distributed environment.
There are some limitations:
1. Persistent session cannot be resumed from another node.
2. Clients with same client id can connect to different nodes at the same time and will not be kicked out.

This is because session information only stores in local node and does not share between nodes.

## Quick Start
The following commands will start a two nodes federation, the configuration files can be found [here](./examples).
Start node1 in Terminal1:
```bash
$ gmqttd start -c path/to/retry_join/node1_config.yml
```
Start node2 in Terminate2:
```bash
$ gmqttd start -c path/to/retry_join/node2_config2.yml
```
After node1 and node2 is started, they will join into one federation atomically.

We can test the federation with `mosquitto_pub/sub`:
Connect to node2 and subscribe topicA:
```bash
$ mosquitto_sub -t topicA -h 127.0.0.1 -p 1884
```
Connect to node1 and send a message to topicA:
```bash
$ mosquitto_pub -t topicA -m 123 -h 127.0.0.1 -p 1883
```
The `mosquitto_sub` will receive "123" and print it in the terminal.
```bash
$ mosquitto_sub -t topicA -h 127.0.0.1 -p 1884
123
```

## Join Nodes via REST API
Federation provides gRPC/REST API to join/leave and query members information, see [swagger](./swagger/federation.swagger.json) for details.
In addition to join nodes upon starting up, you can join a node into federation by using `Join` API.

Start node3 with the configuration with empty `retry_join` which means that the node will not join any nodes upon starting up.
```bash
$ gmqttd start -c path/to/retry_join/join_node3_config.yml
```
We can send `Join` request to any nodes in the federation to get node3 joined, for example, sends `Join` request to node1:
```bash
$ curl -X POST -d '{"hosts":["127.0.0.1:8932"]}' '127.0.0.1:8083/v1/federation/join'
{}
```
And check the members in federation:
```bash
curl http://127.0.0.1:8083/v1/federation/members
{
"members": [
{
"name": "node1",
"addr": "192.168.0.105:8902",
"tags": {
"fed_addr": "192.168.0.105:8901"
},
"status": "STATUS_ALIVE"
},
{
"name": "node2",
"addr": "192.168.0.105:8912",
"tags": {
"fed_addr": "192.168.0.105:8911"
},
"status": "STATUS_ALIVE"
},
{
"name": "node3",
"addr": "192.168.0.105:8932",
"tags": {
"fed_addr": "192.168.0.105:8931"
},
"status": "STATUS_ALIVE"
}
]
}%
```
You will see there are 3 nodes ara alive in the federation.

## Configuration
```go
// Config is the configuration for the federation plugin.
type Config struct {
// NodeName is the unique identifier for the node in the federation. Defaults to hostname.
NodeName string `yaml:"node_name"`
// FedAddr is the gRPC server listening address for the federation internal communication.
// Defaults to :8901.
// If the port is missing, the default federation port (8901) will be used.
FedAddr string `yaml:"fed_addr"`
// AdvertiseFedAddr is used to change the federation gRPC server address that we advertise to other nodes in the cluster.
// Defaults to "FedAddr" or the private IP address of the node if the IP in "FedAddr" is 0.0.0.0.
// However, in some cases, there may be a routable address that cannot be bound.
// If the port is missing, the default federation port (8901) will be used.
AdvertiseFedAddr string `yaml:"advertise_fed_addr"`
// GossipAddr is the address that the gossip will listen on, It is used for both UDP and TCP gossip. Defaults to :8902
GossipAddr string `yaml:"gossip_addr"`
// AdvertiseGossipAddr is used to change the gossip server address that we advertise to other nodes in the cluster.
// Defaults to "GossipAddr" or the private IP address of the node if the IP in "GossipAddr" is 0.0.0.0.
// If the port is missing, the default gossip port (8902) will be used.
AdvertiseGossipAddr string `yaml:"advertise_gossip_addr"`
// RetryJoin is the address of other nodes to join upon starting up.
// If port is missing, the default gossip port (8902) will be used.
RetryJoin []string `yaml:"retry_join"`
// RetryInterval is the time to wait between join attempts. Defaults to 5s.
RetryInterval time.Duration `yaml:"retry_interval"`
// RetryTimeout is the timeout to wait before joining all nodes in RetryJoin successfully.
// If timeout expires, the server will exit with error. Defaults to 1m.
RetryTimeout time.Duration `yaml:"retry_timeout"`
// SnapshotPath will be pass to "SnapshotPath" in serf configuration.
// When Serf is started with a snapshot,
// it will attempt to join all the previously known nodes until one
// succeeds and will also avoid replaying old user events.
SnapshotPath string `yaml:"snapshot_path"`
// RejoinAfterLeave will be pass to "RejoinAfterLeave" in serf configuration.
// It controls our interaction with the snapshot file.
// When set to false (default), a leave causes a Serf to not rejoin
// the cluster until an explicit join is received. If this is set to
// true, we ignore the leave, and rejoin the cluster on start.
RejoinAfterLeave bool `yaml:"rejoin_after_leave"`
}
```

## Implementation Details

### Inner-node Communication
Nodes in the same federation communicate with each other through a couple of gRPC streaming apis:
```proto
message Event {
uint64 id = 1;
oneof Event {
Subscribe Subscribe = 2;
Message message = 3;
Unsubscribe unsubscribe = 4;
}
}
service Federation {
rpc Hello(ClientHello) returns (ServerHello){}
rpc EventStream (stream Event) returns (stream Ack){}
}
```
In general, a node is both Client and Server which implements the `Federation` gRPC service.
* As Client, the node will send subscribe, unsubscribe and message published events to other nodes if necessary.
Each event has a EventID, which is incremental and unique in a session.
* As Server, when receives a event from Client, the node returns an acknowledgement after the event has been handled successfully.

### Session State
The event is designed to be idempotent and will be delivered at least once, just like the QoS 1 message in MQTT protocol.
In order to implement QoS 1 protocol flows, the Client and Server need to associate state with a SessionID,
this is referred to as the Session State. The Server also stores the federation tree and retained messages as part of the Session State.

The Session State in the Client consists of:
* Events which have been sent to the Server, but have not been acknowledged.
* Events pending transmission to the Server.

The Session State in the Server consists of:
* The existence of a Session, even if the rest of the Session State is empty.
* The EventID of the next event that the Server is willing to accept.
* Events which have been received from the Client, but have not sent acknowledged yet.

The Session State stores in memory only. When the Client starts, it generates a random UUID as SessionID.
When the Client detects a new node is joined or reconnects to the Server, it sends the `Hello` request which contains the SessionID to perform a handshake.
During the handshake, the Server will check whether the session for the SessionID exists.

* If the session not exists, the Server sends response with `clean_start=true`.
* If the session exists, the Server sends response with `clean_start=false` and sets the next EventID that it is willing to accept to `next_event_id`.

After handshake succeed, the Client will start `EventStream`:
* If the Client receives `clean_start=true`, it sends all local subscriptions and retained messages to the Server in order to sync the full state.
* If the Client receives `clean_start=false`, it sends events of which the EventID is greater than or equal to `next_event_id`.

### Subscription Tree
Each node in the federation will have two subscription trees, the local tree and the federation tree.
The local tree stores subscriptions for local clients which is managed by gmqtt core and the federation tree stores the subscriptions for remote nodes which is managed by the federation plugin.
The federation tree takes node name as subscriber identifier for subscriptions.
* When receives a sub/unsub packet from a local client, the node will update it's local tree first and then broadcasts the event to other nodes.
* When receives sub/unsub event from a remote node, the node will only update it's federation tree.

All Nodes in the federation will have the same federation tree, and with this tree, the node can determine which node the incoming message should be routed to.
For example, Node1 and Node2 are in the same federation. Client1 connects to Node1 and subscribes to topic a/b, the subscription trees of these two nodes are as follows:

Node1 local tree:

| subscriber | topic |
|------------|-------|
| client1 | a/b |

Node1 federation tree:
empty.

Node2 local tree:
empty.

Node2 federation tree:

| subscriber | topic |
|------------|-------|
| node1 | a/b |

### Message Distribution Process
When an MQTT client publishes a message, the node where it is located queries the federation tree
and forwards the message to the relevant node according to the message topic,
and then the relevant node retrieves the local subscription tree and sends the message to the relevant subscriber.

### Membership Management
Federation uses [Serf](https://github.com/hashicorp/serf) to manage membership.


86 changes: 35 additions & 51 deletions plugin/federation/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (

// Default config.
const (
DefaultFedPort = ":8901"
DefaultGossipPort = ":8902"
DefaultFedPort = "8901"
DefaultGossipPort = "8902"
DefaultRetryInterval = 5 * time.Second
DefaultRetryTimeout = 1 * time.Minute
)
Expand Down Expand Up @@ -73,46 +73,35 @@ func isPortNumber(port string) bool {
return false
}

// joinHostPort returns a network address of the form "host:port".
// If the addr does not contains "port", the function will add defaultPort to it.
// Note that this function does not guarantee the correctness of the returned address.
func joinHostPort(addr string, defaultPort string) (newAddr string) {
portIndex := strings.LastIndex(addr, ":")
if portIndex == -1 {
return addr + defaultPort
}
if len(addr) == portIndex+1 {
return addr
}
// IPv6
if addr[0] == '[' && !isPortNumber(addr[portIndex+1:]) {
return addr + defaultPort
}
return addr
}

func getAddr(addr string, defaultPort string, fieldName string) (string, error) {
fedAddr := joinHostPort(addr, defaultPort)
_, port, err := net.SplitHostPort(fedAddr)
if err != nil {
return "", fmt.Errorf("invalid %s: %s", fieldName, err)
func getAddr(addr string, defaultPort string, fieldName string, usePrivate bool) (string, error) {
if addr == "" {
return "", fmt.Errorf("missing %s", fieldName)
}
if !isPortNumber(port) {
return "", fmt.Errorf("invalid port number: %s", addr)
host, port, err := net.SplitHostPort(addr)
if port == "" {
port = defaultPort
}
return fedAddr, nil
}

func getAdvertiseAddr(hostPort string) (string, error) {
h, p, _ := net.SplitHostPort(hostPort)
if h == "0.0.0.0" || h == "" {
privateIP, err := getPrivateIP()
if addr[len(addr)-1] == ':' {
return "", fmt.Errorf("invalid %s", fieldName)
}
if err != nil && strings.Contains(err.Error(), "missing port in address") {
host, port, err = net.SplitHostPort(addr + ":" + defaultPort)
if err != nil {
return "", fmt.Errorf("invalid %s: %s", fieldName, err)
}
} else if err != nil {
return "", fmt.Errorf("invalid %s: %s", fieldName, err)
}
if usePrivate && (host == "0.0.0.0" || host == "") {
host, err = getPrivateIP()
if err != nil {
return "", err
}
return privateIP + ":" + p, nil
}
return hostPort, nil
if !isPortNumber(port) {
return "", fmt.Errorf("invalid port number: %s", port)
}
return net.JoinHostPort(host, port), nil
}

// Validate validates the configuration, and return an error if it is invalid.
Expand All @@ -124,36 +113,31 @@ func (c *Config) Validate() (err error) {
}
c.NodeName = hostName
}
c.FedAddr, err = getAddr(c.FedAddr, DefaultFedPort, "fed_addr")
c.FedAddr, err = getAddr(c.FedAddr, DefaultFedPort, "fed_addr", false)
if err != nil {
return err
}
c.GossipAddr, err = getAddr(c.GossipAddr, DefaultGossipPort, "gossip_addr")
c.GossipAddr, err = getAddr(c.GossipAddr, DefaultGossipPort, "gossip_addr", false)
if err != nil {
return err
}
if c.AdvertiseFedAddr == "" {
c.AdvertiseFedAddr, err = getAdvertiseAddr(c.FedAddr)
if err != nil {
return err
}
c.AdvertiseFedAddr = c.FedAddr
}
c.AdvertiseFedAddr, err = getAddr(c.AdvertiseFedAddr, DefaultFedPort, "advertise_fed_addr")
c.AdvertiseFedAddr, err = getAddr(c.AdvertiseFedAddr, DefaultFedPort, "advertise_fed_addr", true)
if err != nil {
return err
}
if c.AdvertiseGossipAddr == "" {
c.AdvertiseGossipAddr, err = getAdvertiseAddr(c.GossipAddr)
if err != nil {
return err
}
c.AdvertiseGossipAddr = c.GossipAddr
}
c.AdvertiseGossipAddr, err = getAddr(c.AdvertiseGossipAddr, DefaultGossipPort, "advertise_gossip_addr")
c.AdvertiseGossipAddr, err = getAddr(c.AdvertiseGossipAddr, DefaultGossipPort, "advertise_gossip_addr", true)
if err != nil {
return err
}

for k, v := range c.RetryJoin {
c.RetryJoin[k], err = getAddr(v, DefaultGossipPort, "retry_join")
c.RetryJoin[k], err = getAddr(v, DefaultGossipPort, "retry_join", false)
if err != nil {
return err
}
Expand All @@ -178,8 +162,8 @@ func init() {
}
DefaultConfig = Config{
NodeName: hostName,
FedAddr: DefaultFedPort,
GossipAddr: DefaultGossipPort,
FedAddr: ":" + DefaultFedPort,
GossipAddr: ":" + DefaultFedPort,
RetryJoin: nil,
RetryInterval: DefaultRetryInterval,
RetryTimeout: DefaultRetryTimeout,
Expand Down
Loading

0 comments on commit 0179b7a

Please sign in to comment.