From e5107a8cb486ca11ac9f222ef5fe32810f941dd7 Mon Sep 17 00:00:00 2001 From: Nitu Gupta Date: Thu, 9 Dec 2021 19:14:23 +0530 Subject: [PATCH] [CloudSync] Base Code to connect and publishd data to MQTT broker running on AWS Signed-off-by: Nitu Gupta --- Kconfig | 4 + Makefile | 4 + cmd/edge-orchestration/main.go | 2 + configs/defconfigs/x86_64c | 1 + go.mod | 1 + internal/common/mqtt/mqttconfig.go | 100 ++++++++++++++++++ internal/common/mqtt/mqttconnection.go | 98 +++++++++++++++++ internal/common/mqtt/mqttconnection_test.go | 32 ++++++ internal/controller/cloudsyncmgr/cloudsync.go | 61 +++++++++++ .../cloudsyncmgr/mocks/mocks_cloudsync.go | 61 +++++++++++ internal/orchestrationapi/orchestration.go | 14 +++ .../orchestrationapi/orchestration_test.go | 6 ++ internal/orchestrationapi/orchestrationapi.go | 2 + 13 files changed, 386 insertions(+) create mode 100644 internal/common/mqtt/mqttconfig.go create mode 100644 internal/common/mqtt/mqttconnection.go create mode 100644 internal/common/mqtt/mqttconnection_test.go create mode 100644 internal/controller/cloudsyncmgr/cloudsync.go create mode 100644 internal/controller/cloudsyncmgr/mocks/mocks_cloudsync.go diff --git a/Kconfig b/Kconfig index d9221782..e4d3c5b6 100644 --- a/Kconfig +++ b/Kconfig @@ -89,3 +89,7 @@ config SECURE_MODE default y ---help--- "Secure mode is enable" + +config CLOUD_SYNC + bool "CloudSync" + default y diff --git a/Makefile b/Makefile index 9e6ee86f..77738f65 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,10 @@ ifeq ($(CONFIG_SECURE_MODE),y) RUN_OPTIONS += -e SECURE=true endif +ifeq ($CONFIG_CLOUD_SYNC),y) +RUN_OPTIONS += -e CLOUD_SYNC="" +endif + # Go parameters GOCMD := GO111MODULE=on go GOBUILD := $(GOCMD) build diff --git a/cmd/edge-orchestration/main.go b/cmd/edge-orchestration/main.go index 36502760..21cf5bfc 100644 --- a/cmd/edge-orchestration/main.go +++ b/cmd/edge-orchestration/main.go @@ -26,6 +26,7 @@ import ( "github.com/lf-edge/edge-home-orchestration-go/internal/common/fscreator" "github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr" "github.com/lf-edge/edge-home-orchestration-go/internal/common/sigmgr" + "github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr" @@ -128,6 +129,7 @@ func orchestrationInit() error { builder.SetWatcher(configuremgr.GetInstance(configPath, executionType)) builder.SetDiscovery(discoverymgr.GetInstance()) builder.SetStorage(storagemgr.GetInstance()) + builder.SetCloudSync(cloudsyncmgr.GetInstance()) builder.SetVerifierConf(verifier.GetInstance()) builder.SetScoring(scoringmgr.GetInstance()) builder.SetService(servicemgr.GetInstance()) diff --git a/configs/defconfigs/x86_64c b/configs/defconfigs/x86_64c index c3d883c9..a3618b10 100644 --- a/configs/defconfigs/x86_64c +++ b/configs/defconfigs/x86_64c @@ -13,3 +13,4 @@ CONFIG_CONTAINER=y # CONFIG_ANDROID is not set # CONFIG_MNEDC is not set # CONFIG_SECURE_MODE is not set +CONFIG_CLOUD_SYNC=y diff --git a/go.mod b/go.mod index 04e1d38c..67d7acb9 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/docker/docker v17.12.0-ce-rc1.0.20201201034508-7d75c1d40d88+incompatible github.com/docker/go-connections v0.4.0 github.com/docker/go-units v0.4.0 + github.com/eclipse/paho.mqtt.golang v1.3.5 github.com/edgexfoundry/device-sdk-go v1.4.0 github.com/edgexfoundry/go-mod-core-contracts v0.1.115 github.com/fsnotify/fsnotify v1.4.9 diff --git a/internal/common/mqtt/mqttconfig.go b/internal/common/mqtt/mqttconfig.go new file mode 100644 index 00000000..b9b663b0 --- /dev/null +++ b/internal/common/mqtt/mqttconfig.go @@ -0,0 +1,100 @@ +/******************************************************************************* + * Copyright 2021 Samsung Electronics All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *******************************************************************************/ + +// Package mqtt provides functionalities to handle the client connection to the broker using MQTT +package mqtt + +import ( + "fmt" + "sync" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +const clientID = "TestHomeEdge" +const mqttPort = 1883 + +// Client is a wrapper on top of `MQTT.Client` +type Client struct { + ID string + Host string + Port uint + Qos byte + sync.RWMutex + ClientOptions *MQTT.ClientOptions + MQTT.Client +} + +//Message is used to wrap the app id and payload into one and publish to broker +type Message struct { + AppID string + Payload string +} + +// Config represents an attribute config setter for the `Client`. +type Config func(*Client) + +// SetClientID sets the mqtt client id. +func SetClientID(id string) Config { + return func(c *Client) { + c.ID = id + } +} + +// SetHost sets the host where to connect. +func SetHost(host string) Config { + return func(c *Client) { + c.Host = host + } +} + +// SetPort sets the port where to connect. +func SetPort(port uint) Config { + return func(c *Client) { + c.Port = port + } +} + +//SetBrokerURL returns the broker url for connection +func (c *Client) SetBrokerURL(protocol string) string { + return fmt.Sprintf("%s://%s:%d", protocol, c.Host, c.Port) +} + +// NewClient returns a configured `Client`. Is mandatory +func NewClient(configs ...Config) (*Client, error) { + client := &Client{ + Qos: byte(0), + } + + for _, config := range configs { + config(client) + } + + copts := MQTT.NewClientOptions() + copts.SetClientID(clientID) + copts.SetAutoReconnect(true) + copts.SetMaxReconnectInterval(1 * time.Second) + copts.SetOnConnectHandler(client.onConnect()) + copts.SetConnectionLostHandler(func(c MQTT.Client, err error) { + log.Warn(logPrefix, " disconnected, reason: "+err.Error()) + }) + + client.ClientOptions = copts + + return client, nil +} diff --git a/internal/common/mqtt/mqttconnection.go b/internal/common/mqtt/mqttconnection.go new file mode 100644 index 00000000..6bcd125b --- /dev/null +++ b/internal/common/mqtt/mqttconnection.go @@ -0,0 +1,98 @@ +/******************************************************************************* + * Copyright 2021 Samsung Electronics All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *******************************************************************************/ + +// Package mqtt provides functionalities to handle the client connection to the broker using MQTT +package mqtt + +import ( + "encoding/json" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr" +) + +var ( + log = logmgr.GetInstance() + logPrefix = "[MQTTConnectionMgr]" +) + +// Connect creates a new mqtt client and uses the ClientOptions generated in the NewClient function to connect with the provided host and port. +func (client *Client) Connect() error { + + mqttClient := MQTT.NewClient(client.ClientOptions) + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + log.Info(logPrefix, "MQTT Connected") + + client.Lock() + client.Client = mqttClient + + client.Unlock() + + return nil +} + +//onConnect callback is called on successful connection to broker +func (client *Client) onConnect() MQTT.OnConnectHandler { + log.Info("Running MQTT.OnConnectHandler") + //Adding the subcription if needed for client + return nil +} + +//StartMQTTClient is used to initiate the client and set the configuration +func StartMQTTClient(brokerURL string) { + + clientConfig, err := NewClient( + SetHost(brokerURL), + SetPort(uint(mqttPort)), + ) + if err != nil { + log.Warn(logPrefix, err) + } + clientConfig.ClientOptions.SetOnConnectHandler(clientConfig.onConnect()) + URL := clientConfig.SetBrokerURL("tcp") + log.Info(logPrefix, " The broker is", URL) + clientConfig.ClientOptions.AddBroker(URL) + + connectErr := clientConfig.Connect() + if connectErr != nil { + log.Warn(logPrefix, connectErr) + } +} + +//Publish is used to publish the client data to the cloud +func Publish(client *Client, message Message, topic string) error { + + log.Info(logPrefix, "Publishing the data to cloud") + payload, err := json.Marshal(message) + if err != nil { + log.Warn(logPrefix, "Error in Json Marshalling", err) + } + mqttClient := client.Client + for mqttClient == nil { + time.Sleep(time.Second * 2) + } + token := mqttClient.Publish(topic, 0, true, payload) + if token.Wait() && token.Error() != nil { + return token.Error() + } + time.Sleep(time.Second) + return nil +} diff --git a/internal/common/mqtt/mqttconnection_test.go b/internal/common/mqtt/mqttconnection_test.go new file mode 100644 index 00000000..a9355134 --- /dev/null +++ b/internal/common/mqtt/mqttconnection_test.go @@ -0,0 +1,32 @@ +/******************************************************************************* + * Copyright 2021 Samsung Electronics All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *******************************************************************************/ +package mqtt + +import ( + "testing" +) + +//const Host = "broker.emqx.io" +const Host = "ec2-54-175-241-64.compute-1.amazonaws.com" +const port = "1883" + +func TestStartMQTTClient(t *testing.T) { + t.Run("Success", func(t *testing.T) { + StartMQTTClient(Host) + }) + +} diff --git a/internal/controller/cloudsyncmgr/cloudsync.go b/internal/controller/cloudsyncmgr/cloudsync.go new file mode 100644 index 00000000..51f75bc6 --- /dev/null +++ b/internal/controller/cloudsyncmgr/cloudsync.go @@ -0,0 +1,61 @@ +/******************************************************************************* + * Copyright 2021 Samsung Electronics All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *******************************************************************************/ + +// Package cloudsyncmgr provides functionalities to handle the cloud synchronization +package cloudsyncmgr + +import ( + "github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr" + mqttmgr "github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt" +) + +const ( + logPrefix = "[cloudsyncmgr]" +) + +// CloudSync is the interface for starting Cloud synchronization +type CloudSync interface { + StartCloudSync(host string) error +} + +//CloudSyncImpl struct +type CloudSyncImpl struct{} + +var ( + cloudsyncIns *CloudSyncImpl + log = logmgr.GetInstance() +) + +func init() { + cloudsyncIns = &CloudSyncImpl{} + +} + +// GetInstance returns cloudSync instaance +func GetInstance() CloudSync { + return cloudsyncIns +} + +// StartCloudSync starts a server in terms of CloudSync +func (c *CloudSyncImpl) StartCloudSync(host string) (err error) { + if len(host) > 0 { + log.Info("Starting the CLoudsync Mgr") + go mqttmgr.StartMQTTClient(host) + } + + return nil +} diff --git a/internal/controller/cloudsyncmgr/mocks/mocks_cloudsync.go b/internal/controller/cloudsyncmgr/mocks/mocks_cloudsync.go new file mode 100644 index 00000000..53075de4 --- /dev/null +++ b/internal/controller/cloudsyncmgr/mocks/mocks_cloudsync.go @@ -0,0 +1,61 @@ +/******************************************************************************* +* Copyright 2021 Samsung Electronics All Rights Reserved. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*******************************************************************************/ + +// Code generated by MockGen. DO NOT EDIT. +// Source: cloudsync.go + +package mocks + +import ( + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockCloudSync is a mock of CloudSync interface +type MockCloudSync struct { + ctrl *gomock.Controller + recorder *MockCloudSyncMockRecorder +} + +// MockCloudSyncMockRecorder is the mock recorder for MockCloudSync +type MockCloudSyncMockRecorder struct { + mock *MockCloudSync +} + +// NewMockCloudSync creates a new mock instance +func NewMockCloudSync(ctrl *gomock.Controller) *MockCloudSync { + mock := &MockCloudSync{ctrl: ctrl} + mock.recorder = &MockCloudSyncMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (_m *MockCloudSync) EXPECT() *MockCloudSyncMockRecorder { + return _m.recorder +} + +// StartCloudSync mocks base method +func (_m *MockCloudSync) StartCloudSync(host string) error { + ret := _m.ctrl.Call(_m, "StartCloudSync", host) + ret0, _ := ret[0].(error) + return ret0 +} + +// StartCloudSync indicates an expected call of StartCloudSync +func (_mr *MockCloudSyncMockRecorder) StartCloudSync(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "StartCloudSync", reflect.TypeOf((*MockCloudSync)(nil).StartCloudSync), arg0) +} diff --git a/internal/orchestrationapi/orchestration.go b/internal/orchestrationapi/orchestration.go index a1e53f43..5c46f7b9 100644 --- a/internal/orchestrationapi/orchestration.go +++ b/internal/orchestrationapi/orchestration.go @@ -20,9 +20,11 @@ package orchestrationapi import ( "errors" + "os" "time" "github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr" + "github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr" "github.com/lf-edge/edge-home-orchestration-go/internal/common/commandvalidator" @@ -33,6 +35,7 @@ import ( "github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/discoverymgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/scoringmgr" + "github.com/lf-edge/edge-home-orchestration-go/internal/controller/securemgr/verifier" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/servicemgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/servicemgr/executor" @@ -110,6 +113,9 @@ type OrchestrationBuilder struct { isSetStorage bool storageIns storagemgr.Storage + isSetCloudSync bool + cloudsyncIns cloudsyncmgr.CloudSync + isSetWatcher bool watcherIns configuremgr.Watcher @@ -147,6 +153,12 @@ func (o *OrchestrationBuilder) SetStorage(d storagemgr.Storage) { o.storageIns = d } +// SetCloudSync registers the interface to handle orchestration CloudSync +func (o *OrchestrationBuilder) SetCloudSync(d cloudsyncmgr.CloudSync) { + o.isSetCloudSync = true + o.cloudsyncIns = d +} + // SetWatcher registers the interface to check if service applications are installed func (o *OrchestrationBuilder) SetWatcher(w configuremgr.Watcher) { o.isSetWatcher = true @@ -183,6 +195,7 @@ func (o OrchestrationBuilder) Build() Orche { orcheIns.scoringIns = o.scoringIns orcheIns.discoverIns = o.discoveryIns orcheIns.storageIns = o.storageIns + orcheIns.cloudsyncIns = o.cloudsyncIns orcheIns.verifierIns = o.verifierIns orcheIns.watcher = o.watcherIns orcheIns.serviceIns = o.serviceIns @@ -202,6 +215,7 @@ func (o *orcheImpl) Start(deviceIDPath string, platform string, executionType st resourceMonitorImpl.StartMonitoringResource() o.discoverIns.StartDiscovery(deviceIDPath, platform, executionType) o.storageIns.StartStorage("") + o.cloudsyncIns.StartCloudSync(os.Getenv("CLOUD_SYNC")) o.watcher.Watch(o) o.Ready = true time.Sleep(1000) diff --git a/internal/orchestrationapi/orchestration_test.go b/internal/orchestrationapi/orchestration_test.go index 076b0743..6d4d401f 100644 --- a/internal/orchestrationapi/orchestration_test.go +++ b/internal/orchestrationapi/orchestration_test.go @@ -26,6 +26,7 @@ import ( networkmocks "github.com/lf-edge/edge-home-orchestration-go/internal/common/networkhelper/mocks" resourceutilmocks "github.com/lf-edge/edge-home-orchestration-go/internal/common/resourceutil/mocks" "github.com/lf-edge/edge-home-orchestration-go/internal/common/types/configuremgrtypes" + cloudsyncmocks "github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr/mocks" contextmgrmocks "github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr/mocks" discoverymocks "github.com/lf-edge/edge-home-orchestration-go/internal/controller/discoverymgr/mocks" scoringmocks "github.com/lf-edge/edge-home-orchestration-go/internal/controller/scoringmgr/mocks" @@ -47,6 +48,7 @@ var ( mockDiscovery *discoverymocks.MockDiscovery mockScoring *scoringmocks.MockScoring mockStorage *storagemocks.MockStorage + mockCloudSync *cloudsyncmocks.MockCloudSync mockService *servicemocks.MockServiceMgr mockExecutor *executormocks.MockServiceExecutor mockDBHelper *dbhelpermocks.MockMultipleBucketQuery @@ -63,6 +65,7 @@ func createMockIns(ctrl *gomock.Controller) { mockDiscovery = discoverymocks.NewMockDiscovery(ctrl) mockScoring = scoringmocks.NewMockScoring(ctrl) mockStorage = storagemocks.NewMockStorage(ctrl) + mockCloudSync = cloudsyncmocks.NewMockCloudSync(ctrl) mockService = servicemocks.NewMockServiceMgr(ctrl) mockExecutor = executormocks.NewMockServiceExecutor(ctrl) mockDBHelper = dbhelpermocks.NewMockMultipleBucketQuery(ctrl) @@ -80,6 +83,7 @@ func getOcheIns(ctrl *gomock.Controller) Orche { builder.SetExecutor(mockExecutor) builder.SetScoring(mockScoring) builder.SetStorage(mockStorage) + builder.SetCloudSync(mockCloudSync) builder.SetService(mockService) builder.SetWatcher(mockWatcher) builder.SetClient(mockClient) @@ -111,6 +115,7 @@ func TestBuild(t *testing.T) { builder.SetExecutor(mockExecutor) builder.SetScoring(mockScoring) builder.SetStorage(mockStorage) + builder.SetCloudSync(mockCloudSync) builder.SetService(mockService) builder.SetWatcher(mockWatcher) builder.SetClient(mockClient) @@ -219,6 +224,7 @@ func TestStart(t *testing.T) { mockResourceutil.EXPECT().StartMonitoringResource(), mockDiscovery.EXPECT().StartDiscovery(gomock.Eq(deviceIDPath), gomock.Eq(platform), gomock.Eq(executionType)), mockStorage.EXPECT().StartStorage(gomock.Any()), + mockCloudSync.EXPECT().StartCloudSync(gomock.Any()), mockWatcher.EXPECT().Watch(gomock.Any()), ) diff --git a/internal/orchestrationapi/orchestrationapi.go b/internal/orchestrationapi/orchestrationapi.go index d882c95e..41e471a1 100644 --- a/internal/orchestrationapi/orchestrationapi.go +++ b/internal/orchestrationapi/orchestrationapi.go @@ -27,6 +27,7 @@ import ( "github.com/lf-edge/edge-home-orchestration-go/internal/common/commandvalidator" "github.com/lf-edge/edge-home-orchestration-go/internal/common/networkhelper" "github.com/lf-edge/edge-home-orchestration-go/internal/common/requestervalidator" + "github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/discoverymgr" "github.com/lf-edge/edge-home-orchestration-go/internal/controller/scoringmgr" @@ -50,6 +51,7 @@ type orcheImpl struct { watcher configuremgr.Watcher notificationIns notification.Notification storageIns storagemgr.Storage + cloudsyncIns cloudsyncmgr.CloudSync networkhelper networkhelper.Network clientAPI client.Clienter }