diff --git a/Kconfig b/Kconfig index d9221782..e4d3c5b6 100644 --- a/Kconfig +++ b/Kconfig @@ -89,3 +89,7 @@ config SECURE_MODE default y ---help--- "Secure mode is enable" + +config CLOUD_SYNC + bool "CloudSync" + default y diff --git a/Makefile b/Makefile index 9e6ee86f..175effa9 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,10 @@ ifeq ($(CONFIG_SECURE_MODE),y) RUN_OPTIONS += -e SECURE=true endif +ifeq ($CONFIG_CLOUD_SYNC),y) +RUN_OPTIONS += -e CLOUD_SYNC=$(url) +endif + # Go parameters GOCMD := GO111MODULE=on go GOBUILD := $(GOCMD) build diff --git a/cmd/edge-orchestration/main.go b/cmd/edge-orchestration/main.go index 36502760..f6336130 100644 --- a/cmd/edge-orchestration/main.go +++ b/cmd/edge-orchestration/main.go @@ -26,6 +26,7 @@ import ( "github.com/lf-edge/edge-home-orchestration-go/internal/common/fscreator" "github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr" "github.com/lf-edge/edge-home-orchestration-go/internal/common/sigmgr" + "github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr" @@ -100,6 +101,7 @@ func orchestrationInit() error { secure := os.Getenv("SECURE") mnedc := os.Getenv("MNEDC") + cloudsync := os.Getenv("CLOUD_SYNC") isSecured := false if len(secure) > 0 { @@ -128,6 +130,7 @@ func orchestrationInit() error { builder.SetWatcher(configuremgr.GetInstance(configPath, executionType)) builder.SetDiscovery(discoverymgr.GetInstance()) builder.SetStorage(storagemgr.GetInstance()) + builder.SetCloudSync(cloudsyncmgr.GetInstance()) builder.SetVerifierConf(verifier.GetInstance()) builder.SetScoring(scoringmgr.GetInstance()) builder.SetService(servicemgr.GetInstance()) @@ -190,6 +193,12 @@ func orchestrationInit() error { go discoverymgr.GetInstance().StartMNEDCClient(deviceIDFilePath, mnedcServerConfig) } } + if len(cloudsync) > 0 { + //initiate cloudsync + cloudsyncmgr.GetInstance().StartCloudSync(cloudsync) + } else { + log.Info(logPrefix, "Clousync is off") + } return nil } diff --git a/configs/defconfigs/x86_64c b/configs/defconfigs/x86_64c index c3d883c9..a3618b10 100644 --- a/configs/defconfigs/x86_64c +++ b/configs/defconfigs/x86_64c @@ -13,3 +13,4 @@ CONFIG_CONTAINER=y # CONFIG_ANDROID is not set # CONFIG_MNEDC is not set # CONFIG_SECURE_MODE is not set +CONFIG_CLOUD_SYNC=y diff --git a/go.mod b/go.mod index 04e1d38c..67d7acb9 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/docker/docker v17.12.0-ce-rc1.0.20201201034508-7d75c1d40d88+incompatible github.com/docker/go-connections v0.4.0 github.com/docker/go-units v0.4.0 + github.com/eclipse/paho.mqtt.golang v1.3.5 github.com/edgexfoundry/device-sdk-go v1.4.0 github.com/edgexfoundry/go-mod-core-contracts v0.1.115 github.com/fsnotify/fsnotify v1.4.9 diff --git a/internal/common/mqtt/mqttconfig.go b/internal/common/mqtt/mqttconfig.go new file mode 100644 index 00000000..a129c3f0 --- /dev/null +++ b/internal/common/mqtt/mqttconfig.go @@ -0,0 +1,82 @@ +package mqtt + +import ( + "fmt" + "sync" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +const clientID = "TestHomeEdge" +const mqttPort = 1883 + +// Client is a wrapper on top of `MQTT.Client` +type Client struct { + ID string + Host string + Port uint + Qos byte + sync.RWMutex + ClientOptions *MQTT.ClientOptions + MQTT.Client +} + +//Message is used to wrap the app id and payload into one and publish to broker +type Message struct { + AppID string + Payload string +} + +// Config represents an attribute config setter for the `Client`. +type Config func(*Client) + +// SetClientID sets the mqtt client id. +func SetClientID(id string) Config { + return func(c *Client) { + c.ID = id + } +} + +// SetHost sets the host where to connect. +func SetHost(host string) Config { + return func(c *Client) { + c.Host = host + } +} + +// SetPort sets the port where to connect. +func SetPort(port uint) Config { + return func(c *Client) { + c.Port = port + } +} + +//SetBrokerURL returns the broker url for connection +func (c *Client) SetBrokerURL(protocol string) string { + return fmt.Sprintf("%s://%s:%d", protocol, c.Host, c.Port) +} + +// NewClient returns a configured `Client`. Is mandatory +func NewClient(configs ...Config) (*Client, error) { + client := &Client{ + Qos: byte(0), + } + + for _, config := range configs { + config(client) + } + + copts := MQTT.NewClientOptions() + copts.SetClientID(clientID) + copts.SetAutoReconnect(true) + copts.SetMaxReconnectInterval(1 * time.Second) + copts.SetOnConnectHandler(client.onConnect()) + copts.SetConnectionLostHandler(func(c MQTT.Client, err error) { + log.Warn(logPrefix, " disconnected, reason: "+err.Error()) + }) + + client.ClientOptions = copts + + return client, nil +} diff --git a/internal/common/mqtt/mqttconnection.go b/internal/common/mqtt/mqttconnection.go new file mode 100644 index 00000000..f5e712a0 --- /dev/null +++ b/internal/common/mqtt/mqttconnection.go @@ -0,0 +1,80 @@ +package mqtt + +import ( + "encoding/json" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr" +) + +var ( + log = logmgr.GetInstance() + logPrefix = "[MQTTConnectionMgr]" +) + +// Connect creates a new mqtt client and uses the ClientOptions generated in the NewClient function to connect with the provided host and port. +func (client *Client) Connect() error { + + mqttClient := MQTT.NewClient(client.ClientOptions) + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + log.Info(logPrefix, "MQTT Connected") + + client.Lock() + client.Client = mqttClient + + client.Unlock() + + return nil +} + +//onConnect callback is called on successful connection to broker +func (client *Client) onConnect() MQTT.OnConnectHandler { + log.Info("Running MQTT.OnConnectHandler") + //Adding the subcription if needed for client + return nil +} + +//StartMQTTClient is used to initiate the client and set the configuration +func StartMQTTClient(brokerURL string) { + + clientConfig, err := NewClient( + SetHost(brokerURL), + SetPort(uint(mqttPort)), + ) + if err != nil { + log.Warn(logPrefix, err) + } + clientConfig.ClientOptions.SetOnConnectHandler(clientConfig.onConnect()) + URL := clientConfig.SetBrokerURL("tcp") + log.Info(logPrefix, " The broker is", URL) + clientConfig.ClientOptions.AddBroker(URL) + + connectErr := clientConfig.Connect() + if connectErr != nil { + log.Warn(logPrefix, connectErr) + } +} + +//Publish is used to publish the client data to the cloud +func Publish(client *Client, message Message, topic string) error { + + log.Info(logPrefix, "Publishing the data to cloud") + payload, err := json.Marshal(message) + if err != nil { + log.Warn(logPrefix, "Error in Json Marshalling", err) + } + mqttClient := client.Client + for mqttClient == nil { + time.Sleep(time.Second * 2) + } + token := mqttClient.Publish(topic, 0, true, payload) + if token.Wait() && token.Error() != nil { + return token.Error() + } + time.Sleep(time.Second) + return nil +} diff --git a/internal/common/mqtt/mqttconnection_test.go b/internal/common/mqtt/mqttconnection_test.go new file mode 100644 index 00000000..11fc5828 --- /dev/null +++ b/internal/common/mqtt/mqttconnection_test.go @@ -0,0 +1,16 @@ +package mqtt + +import ( + "testing" +) + +//const Host = "broker.emqx.io" +const Host = "ec2-54-175-241-64.compute-1.amazonaws.com" +const port = "1883" + +func TestStartMQTTClient(t *testing.T) { + t.Run("Success", func(t *testing.T) { + StartMQTTClient(Host) + }) + +} diff --git a/internal/controller/cloudsyncmgr/cloudsync.go b/internal/controller/cloudsyncmgr/cloudsync.go new file mode 100644 index 00000000..d071b193 --- /dev/null +++ b/internal/controller/cloudsyncmgr/cloudsync.go @@ -0,0 +1,36 @@ +package cloudsyncmgr + +import ( + mqttmgr "github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt" +) + +const ( + logPrefix = "[cloudsyncmgr]" +) + +// CloudSync is the interface for starting Cloud synchronization +type CloudSync interface { + StartCloudSync(host string) error +} + +//CloudSyncImpl struct +type CloudSyncImpl struct{} + +var ( + cloudsyncIns *CloudSyncImpl +) + +func init() { + cloudsyncIns = &CloudSyncImpl{} +} + +// GetInstance returns cloudSync instaance +func GetInstance() CloudSync { + return cloudsyncIns +} + +// StartCloudSync starts a server in terms of CloudSync +func (c *CloudSyncImpl) StartCloudSync(host string) (err error) { + mqttmgr.StartMQTTClient(host) + return nil +} diff --git a/internal/orchestrationapi/orchestration.go b/internal/orchestrationapi/orchestration.go index a1e53f43..c5f03ee7 100644 --- a/internal/orchestrationapi/orchestration.go +++ b/internal/orchestrationapi/orchestration.go @@ -23,6 +23,7 @@ import ( "time" "github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr" + "github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr" "github.com/lf-edge/edge-home-orchestration-go/internal/common/commandvalidator" @@ -33,6 +34,7 @@ import ( "github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/discoverymgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/scoringmgr" + "github.com/lf-edge/edge-home-orchestration-go/internal/controller/securemgr/verifier" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/servicemgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/servicemgr/executor" @@ -110,6 +112,9 @@ type OrchestrationBuilder struct { isSetStorage bool storageIns storagemgr.Storage + isSetCloudSync bool + cloudsyncIns cloudsyncmgr.CloudSync + isSetWatcher bool watcherIns configuremgr.Watcher @@ -147,6 +152,12 @@ func (o *OrchestrationBuilder) SetStorage(d storagemgr.Storage) { o.storageIns = d } +// SetCloudSync registers the interface to handle orchestration CloudSync +func (o *OrchestrationBuilder) SetCloudSync(d cloudsyncmgr.CloudSync) { + o.isSetCloudSync = true + o.cloudsyncIns = d +} + // SetWatcher registers the interface to check if service applications are installed func (o *OrchestrationBuilder) SetWatcher(w configuremgr.Watcher) { o.isSetWatcher = true