Skip to content

Commit

Permalink
[CloudSync] Base Code to connect and publishd data to MQTT broker run…
Browse files Browse the repository at this point in the history
…ning on AWS (#436)

Signed-off-by: Nitu Gupta <[email protected]>
  • Loading branch information
nitu-s-gupta authored Dec 20, 2021
1 parent 9c73881 commit fbaadad
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ config SECURE_MODE
default y
---help---
"Secure mode is enable"

config CLOUD_SYNC
bool "CloudSync"
default y
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ ifeq ($(CONFIG_SECURE_MODE),y)
RUN_OPTIONS += -e SECURE=true
endif

ifeq ($CONFIG_CLOUD_SYNC),y)
RUN_OPTIONS += -e CLOUD_SYNC=""
endif

# Go parameters
GOCMD := GO111MODULE=on go
GOBUILD := $(GOCMD) build
Expand Down
2 changes: 2 additions & 0 deletions cmd/edge-orchestration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/lf-edge/edge-home-orchestration-go/internal/common/fscreator"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/sigmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr"

"github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr"
Expand Down Expand Up @@ -130,6 +131,7 @@ func orchestrationInit() error {
builder.SetWatcher(configuremgr.GetInstance(configPath, executionType))
builder.SetDiscovery(discoverymgr.GetInstance())
builder.SetStorage(storagemgr.GetInstance())
builder.SetCloudSync(cloudsyncmgr.GetInstance())
builder.SetVerifierConf(verifier.GetInstance())
builder.SetScoring(scoringmgr.GetInstance())
builder.SetService(servicemgr.GetInstance())
Expand Down
1 change: 1 addition & 0 deletions configs/defconfigs/x86_64c
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ CONFIG_CONTAINER=y
# CONFIG_ANDROID is not set
# CONFIG_MNEDC is not set
# CONFIG_SECURE_MODE is not set
CONFIG_CLOUD_SYNC=y
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/docker/docker v17.12.0-ce-rc1.0.20201201034508-7d75c1d40d88+incompatible
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/edgexfoundry/device-sdk-go v1.4.0
github.com/edgexfoundry/go-mod-core-contracts v0.1.115
github.com/fsnotify/fsnotify v1.4.9
Expand Down
100 changes: 100 additions & 0 deletions internal/common/mqtt/mqttconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*******************************************************************************
* Copyright 2021 Samsung Electronics All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

// Package mqtt provides functionalities to handle the client connection to the broker using MQTT
package mqtt

import (
"fmt"
"sync"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
)

const clientID = "TestHomeEdge"
const mqttPort = 1883

// Client is a wrapper on top of `MQTT.Client`
type Client struct {
ID string
Host string
Port uint
Qos byte
sync.RWMutex
ClientOptions *MQTT.ClientOptions
MQTT.Client
}

//Message is used to wrap the app id and payload into one and publish to broker
type Message struct {
AppID string
Payload string
}

// Config represents an attribute config setter for the `Client`.
type Config func(*Client)

// SetClientID sets the mqtt client id.
func SetClientID(id string) Config {
return func(c *Client) {
c.ID = id
}
}

// SetHost sets the host where to connect.
func SetHost(host string) Config {
return func(c *Client) {
c.Host = host
}
}

// SetPort sets the port where to connect.
func SetPort(port uint) Config {
return func(c *Client) {
c.Port = port
}
}

//SetBrokerURL returns the broker url for connection
func (c *Client) SetBrokerURL(protocol string) string {
return fmt.Sprintf("%s://%s:%d", protocol, c.Host, c.Port)
}

// NewClient returns a configured `Client`. Is mandatory
func NewClient(configs ...Config) (*Client, error) {
client := &Client{
Qos: byte(0),
}

for _, config := range configs {
config(client)
}

copts := MQTT.NewClientOptions()
copts.SetClientID(clientID)
copts.SetAutoReconnect(true)
copts.SetMaxReconnectInterval(1 * time.Second)
copts.SetOnConnectHandler(client.onConnect())
copts.SetConnectionLostHandler(func(c MQTT.Client, err error) {
log.Warn(logPrefix, " disconnected, reason: "+err.Error())
})

client.ClientOptions = copts

return client, nil
}
98 changes: 98 additions & 0 deletions internal/common/mqtt/mqttconnection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*******************************************************************************
* Copyright 2021 Samsung Electronics All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

// Package mqtt provides functionalities to handle the client connection to the broker using MQTT
package mqtt

import (
"encoding/json"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
)

var (
log = logmgr.GetInstance()
logPrefix = "[MQTTConnectionMgr]"
)

// Connect creates a new mqtt client and uses the ClientOptions generated in the NewClient function to connect with the provided host and port.
func (client *Client) Connect() error {

mqttClient := MQTT.NewClient(client.ClientOptions)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}

log.Info(logPrefix, "MQTT Connected")

client.Lock()
client.Client = mqttClient

client.Unlock()

return nil
}

//onConnect callback is called on successful connection to broker
func (client *Client) onConnect() MQTT.OnConnectHandler {
log.Info("Running MQTT.OnConnectHandler")
//Adding the subcription if needed for client
return nil
}

//StartMQTTClient is used to initiate the client and set the configuration
func StartMQTTClient(brokerURL string) {

clientConfig, err := NewClient(
SetHost(brokerURL),
SetPort(uint(mqttPort)),
)
if err != nil {
log.Warn(logPrefix, err)
}
clientConfig.ClientOptions.SetOnConnectHandler(clientConfig.onConnect())
URL := clientConfig.SetBrokerURL("tcp")
log.Info(logPrefix, " The broker is", URL)
clientConfig.ClientOptions.AddBroker(URL)

connectErr := clientConfig.Connect()
if connectErr != nil {
log.Warn(logPrefix, connectErr)
}
}

//Publish is used to publish the client data to the cloud
func Publish(client *Client, message Message, topic string) error {

log.Info(logPrefix, "Publishing the data to cloud")
payload, err := json.Marshal(message)
if err != nil {
log.Warn(logPrefix, "Error in Json Marshalling", err)
}
mqttClient := client.Client
for mqttClient == nil {
time.Sleep(time.Second * 2)
}
token := mqttClient.Publish(topic, 0, true, payload)
if token.Wait() && token.Error() != nil {
return token.Error()
}
time.Sleep(time.Second)
return nil
}
32 changes: 32 additions & 0 deletions internal/common/mqtt/mqttconnection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*******************************************************************************
* Copyright 2021 Samsung Electronics All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/
package mqtt

import (
"testing"
)

//const Host = "broker.emqx.io"
const Host = "ec2-54-175-241-64.compute-1.amazonaws.com"
const port = "1883"

func TestStartMQTTClient(t *testing.T) {
t.Run("Success", func(t *testing.T) {
StartMQTTClient(Host)
})

}
61 changes: 61 additions & 0 deletions internal/controller/cloudsyncmgr/cloudsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*******************************************************************************
* Copyright 2021 Samsung Electronics All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

// Package cloudsyncmgr provides functionalities to handle the cloud synchronization
package cloudsyncmgr

import (
"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
mqttmgr "github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt"
)

const (
logPrefix = "[cloudsyncmgr]"
)

// CloudSync is the interface for starting Cloud synchronization
type CloudSync interface {
StartCloudSync(host string) error
}

//CloudSyncImpl struct
type CloudSyncImpl struct{}

var (
cloudsyncIns *CloudSyncImpl
log = logmgr.GetInstance()
)

func init() {
cloudsyncIns = &CloudSyncImpl{}

}

// GetInstance returns cloudSync instaance
func GetInstance() CloudSync {
return cloudsyncIns
}

// StartCloudSync starts a server in terms of CloudSync
func (c *CloudSyncImpl) StartCloudSync(host string) (err error) {
if len(host) > 0 {
log.Info("Starting the CLoudsync Mgr")
go mqttmgr.StartMQTTClient(host)
}

return nil
}
Loading

0 comments on commit fbaadad

Please sign in to comment.