Skip to content

Commit

Permalink
[CloudSync] Base Code to connect to MQTT broker running on AWS
Browse files Browse the repository at this point in the history
Signed-off-by: Nitu Gupta <[email protected]>
  • Loading branch information
nitu-s-gupta committed Dec 14, 2021
1 parent 0af6955 commit 3b39427
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ config SECURE_MODE
default y
---help---
"Secure mode is enable"

config CLOUD_SYNC
bool "CloudSync"
default y
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ ifeq ($(CONFIG_SECURE_MODE),y)
RUN_OPTIONS += -e SECURE=true
endif

ifeq ($CONFIG_CLOUD_SYNC),y)
RUN_OPTIONS += -e CLOUD_SYNC=$(url)
endif

# Go parameters
GOCMD := GO111MODULE=on go
GOBUILD := $(GOCMD) build
Expand Down
9 changes: 9 additions & 0 deletions cmd/edge-orchestration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/lf-edge/edge-home-orchestration-go/internal/common/fscreator"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/sigmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr"

"github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr"
Expand Down Expand Up @@ -100,6 +101,7 @@ func orchestrationInit() error {

secure := os.Getenv("SECURE")
mnedc := os.Getenv("MNEDC")
cloudsync := os.Getenv("CLOUD_SYNC")

isSecured := false
if len(secure) > 0 {
Expand Down Expand Up @@ -128,6 +130,7 @@ func orchestrationInit() error {
builder.SetWatcher(configuremgr.GetInstance(configPath, executionType))
builder.SetDiscovery(discoverymgr.GetInstance())
builder.SetStorage(storagemgr.GetInstance())
builder.SetCloudSync(cloudsyncmgr.GetInstance())
builder.SetVerifierConf(verifier.GetInstance())
builder.SetScoring(scoringmgr.GetInstance())
builder.SetService(servicemgr.GetInstance())
Expand Down Expand Up @@ -190,6 +193,12 @@ func orchestrationInit() error {
go discoverymgr.GetInstance().StartMNEDCClient(deviceIDFilePath, mnedcServerConfig)
}
}
if len(cloudsync) > 0 {
//initiate cloudsync
cloudsyncmgr.GetInstance().StartCloudSync(cloudsync)
} else {
log.Info(logPrefix, "Clousync is off")
}

return nil
}
1 change: 1 addition & 0 deletions configs/defconfigs/x86_64c
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ CONFIG_CONTAINER=y
# CONFIG_ANDROID is not set
# CONFIG_MNEDC is not set
# CONFIG_SECURE_MODE is not set
CONFIG_CLOUD_SYNC=y
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/docker/docker v17.12.0-ce-rc1.0.20201201034508-7d75c1d40d88+incompatible
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/edgexfoundry/device-sdk-go v1.4.0
github.com/edgexfoundry/go-mod-core-contracts v0.1.115
github.com/fsnotify/fsnotify v1.4.9
Expand Down
82 changes: 82 additions & 0 deletions internal/common/mqtt/mqttconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package mqtt

import (
"fmt"
"sync"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
)

const clientID = "TestHomeEdge"
const mqttPort = 1883

// Client is a wrapper on top of `MQTT.Client`
type Client struct {
ID string
Host string
Port uint
Qos byte
sync.RWMutex
ClientOptions *MQTT.ClientOptions
MQTT.Client
}

//Message is used to wrap the app id and payload into one and publish to broker
type Message struct {
AppID string
Payload string
}

// Config represents an attribute config setter for the `Client`.
type Config func(*Client)

// SetClientID sets the mqtt client id.
func SetClientID(id string) Config {
return func(c *Client) {
c.ID = id
}
}

// SetHost sets the host where to connect.
func SetHost(host string) Config {
return func(c *Client) {
c.Host = host
}
}

// SetPort sets the port where to connect.
func SetPort(port uint) Config {
return func(c *Client) {
c.Port = port
}
}

//SetBrokerURL returns the broker url for connection
func (c *Client) SetBrokerURL(protocol string) string {
return fmt.Sprintf("%s://%s:%d", protocol, c.Host, c.Port)
}

// NewClient returns a configured `Client`. Is mandatory
func NewClient(configs ...Config) (*Client, error) {
client := &Client{
Qos: byte(0),
}

for _, config := range configs {
config(client)
}

copts := MQTT.NewClientOptions()
copts.SetClientID(clientID)
copts.SetAutoReconnect(true)
copts.SetMaxReconnectInterval(1 * time.Second)
copts.SetOnConnectHandler(client.onConnect())
copts.SetConnectionLostHandler(func(c MQTT.Client, err error) {
log.Warn(logPrefix, " disconnected, reason: "+err.Error())
})

client.ClientOptions = copts

return client, nil
}
80 changes: 80 additions & 0 deletions internal/common/mqtt/mqttconnection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package mqtt

import (
"encoding/json"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
)

var (
log = logmgr.GetInstance()
logPrefix = "[MQTTConnectionMgr]"
)

// Connect creates a new mqtt client and uses the ClientOptions generated in the NewClient function to connect with the provided host and port.
func (client *Client) Connect() error {

mqttClient := MQTT.NewClient(client.ClientOptions)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}

log.Info(logPrefix, "MQTT Connected")

client.Lock()
client.Client = mqttClient

client.Unlock()

return nil
}

//onConnect callback is called on successful connection to broker
func (client *Client) onConnect() MQTT.OnConnectHandler {
log.Info("Running MQTT.OnConnectHandler")
//Adding the subcription if needed for client
return nil
}

//StartMQTTClient is used to initiate the client and set the configuration
func StartMQTTClient(brokerURL string) {

clientConfig, err := NewClient(
SetHost(brokerURL),
SetPort(uint(mqttPort)),
)
if err != nil {
log.Warn(logPrefix, err)
}
clientConfig.ClientOptions.SetOnConnectHandler(clientConfig.onConnect())
URL := clientConfig.SetBrokerURL("tcp")
log.Info(logPrefix, " The broker is", URL)
clientConfig.ClientOptions.AddBroker(URL)

connectErr := clientConfig.Connect()
if connectErr != nil {
log.Warn(logPrefix, connectErr)
}
}

//Publish is used to publish the client data to the cloud
func Publish(client *Client, message Message, topic string) error {

log.Info(logPrefix, "Publishing the data to cloud")
payload, err := json.Marshal(message)
if err != nil {
log.Warn(logPrefix, "Error in Json Marshalling", err)
}
mqttClient := client.Client
for mqttClient == nil {
time.Sleep(time.Second * 2)
}
token := mqttClient.Publish(topic, 0, true, payload)
if token.Wait() && token.Error() != nil {
return token.Error()
}
time.Sleep(time.Second)
return nil
}
16 changes: 16 additions & 0 deletions internal/common/mqtt/mqttconnection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mqtt

import (
"testing"
)

//const Host = "broker.emqx.io"
const Host = "ec2-54-175-241-64.compute-1.amazonaws.com"
const port = "1883"

func TestStartMQTTClient(t *testing.T) {
t.Run("Success", func(t *testing.T) {
StartMQTTClient(Host)
})

}
36 changes: 36 additions & 0 deletions internal/controller/cloudsyncmgr/cloudsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cloudsyncmgr

import (
mqttmgr "github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt"
)

const (
logPrefix = "[cloudsyncmgr]"
)

// CloudSync is the interface for starting Cloud synchronization
type CloudSync interface {
StartCloudSync(host string) error
}

//CloudSyncImpl struct
type CloudSyncImpl struct{}

var (
cloudsyncIns *CloudSyncImpl
)

func init() {
cloudsyncIns = &CloudSyncImpl{}
}

// GetInstance returns cloudSync instaance
func GetInstance() CloudSync {
return cloudsyncIns
}

// StartCloudSync starts a server in terms of CloudSync
func (c *CloudSyncImpl) StartCloudSync(host string) (err error) {
mqttmgr.StartMQTTClient(host)
return nil
}
11 changes: 11 additions & 0 deletions internal/orchestrationapi/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr"

"github.com/lf-edge/edge-home-orchestration-go/internal/common/commandvalidator"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/discoverymgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/scoringmgr"

"github.com/lf-edge/edge-home-orchestration-go/internal/controller/securemgr/verifier"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/servicemgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/servicemgr/executor"
Expand Down Expand Up @@ -110,6 +112,9 @@ type OrchestrationBuilder struct {
isSetStorage bool
storageIns storagemgr.Storage

isSetCloudSync bool
cloudsyncIns cloudsyncmgr.CloudSync

isSetWatcher bool
watcherIns configuremgr.Watcher

Expand Down Expand Up @@ -147,6 +152,12 @@ func (o *OrchestrationBuilder) SetStorage(d storagemgr.Storage) {
o.storageIns = d
}

// SetCloudSync registers the interface to handle orchestration CloudSync
func (o *OrchestrationBuilder) SetCloudSync(d cloudsyncmgr.CloudSync) {
o.isSetCloudSync = true
o.cloudsyncIns = d
}

// SetWatcher registers the interface to check if service applications are installed
func (o *OrchestrationBuilder) SetWatcher(w configuremgr.Watcher) {
o.isSetWatcher = true
Expand Down

0 comments on commit 3b39427

Please sign in to comment.