Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CloudSync] Base Code to connect to MQTT broker running on AWS #436

Merged
merged 1 commit into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ config SECURE_MODE
default y
---help---
"Secure mode is enable"

config CLOUD_SYNC
bool "CloudSync"
default y
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ ifeq ($(CONFIG_SECURE_MODE),y)
RUN_OPTIONS += -e SECURE=true
endif

ifeq ($CONFIG_CLOUD_SYNC),y)
RUN_OPTIONS += -e CLOUD_SYNC=""
endif

tdrozdovsky marked this conversation as resolved.
Show resolved Hide resolved
# Go parameters
GOCMD := GO111MODULE=on go
GOBUILD := $(GOCMD) build
Expand Down
2 changes: 2 additions & 0 deletions cmd/edge-orchestration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/lf-edge/edge-home-orchestration-go/internal/common/fscreator"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/sigmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr"

"github.com/lf-edge/edge-home-orchestration-go/internal/controller/configuremgr"
Expand Down Expand Up @@ -128,6 +129,7 @@ func orchestrationInit() error {
builder.SetWatcher(configuremgr.GetInstance(configPath, executionType))
builder.SetDiscovery(discoverymgr.GetInstance())
builder.SetStorage(storagemgr.GetInstance())
builder.SetCloudSync(cloudsyncmgr.GetInstance())
builder.SetVerifierConf(verifier.GetInstance())
builder.SetScoring(scoringmgr.GetInstance())
builder.SetService(servicemgr.GetInstance())
Expand Down
1 change: 1 addition & 0 deletions configs/defconfigs/x86_64c
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ CONFIG_CONTAINER=y
# CONFIG_ANDROID is not set
# CONFIG_MNEDC is not set
# CONFIG_SECURE_MODE is not set
CONFIG_CLOUD_SYNC=y
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/docker/docker v17.12.0-ce-rc1.0.20201201034508-7d75c1d40d88+incompatible
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/edgexfoundry/device-sdk-go v1.4.0
github.com/edgexfoundry/go-mod-core-contracts v0.1.115
github.com/fsnotify/fsnotify v1.4.9
Expand Down
100 changes: 100 additions & 0 deletions internal/common/mqtt/mqttconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*******************************************************************************
* Copyright 2021 Samsung Electronics All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

// Package mqtt provides functionalities to handle the client connection to the broker using MQTT
package mqtt
MoonkiHong marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"sync"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
)

const clientID = "TestHomeEdge"
const mqttPort = 1883

// Client is a wrapper on top of `MQTT.Client`
type Client struct {
ID string
Host string
Port uint
Qos byte
sync.RWMutex
ClientOptions *MQTT.ClientOptions
MQTT.Client
}

//Message is used to wrap the app id and payload into one and publish to broker
type Message struct {
AppID string
Payload string
}

// Config represents an attribute config setter for the `Client`.
type Config func(*Client)

// SetClientID sets the mqtt client id.
func SetClientID(id string) Config {
return func(c *Client) {
c.ID = id
}
}

// SetHost sets the host where to connect.
func SetHost(host string) Config {
return func(c *Client) {
c.Host = host
}
}

// SetPort sets the port where to connect.
func SetPort(port uint) Config {
return func(c *Client) {
c.Port = port
}
}

//SetBrokerURL returns the broker url for connection
func (c *Client) SetBrokerURL(protocol string) string {
return fmt.Sprintf("%s://%s:%d", protocol, c.Host, c.Port)
}

// NewClient returns a configured `Client`. Is mandatory
func NewClient(configs ...Config) (*Client, error) {
client := &Client{
Qos: byte(0),
}

for _, config := range configs {
config(client)
}

copts := MQTT.NewClientOptions()
copts.SetClientID(clientID)
copts.SetAutoReconnect(true)
copts.SetMaxReconnectInterval(1 * time.Second)
copts.SetOnConnectHandler(client.onConnect())
copts.SetConnectionLostHandler(func(c MQTT.Client, err error) {
log.Warn(logPrefix, " disconnected, reason: "+err.Error())
})

client.ClientOptions = copts

return client, nil
}
98 changes: 98 additions & 0 deletions internal/common/mqtt/mqttconnection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*******************************************************************************
* Copyright 2021 Samsung Electronics All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

// Package mqtt provides functionalities to handle the client connection to the broker using MQTT
package mqtt
MoonkiHong marked this conversation as resolved.
Show resolved Hide resolved

import (
"encoding/json"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
)

var (
log = logmgr.GetInstance()
logPrefix = "[MQTTConnectionMgr]"
)

// Connect creates a new mqtt client and uses the ClientOptions generated in the NewClient function to connect with the provided host and port.
func (client *Client) Connect() error {

mqttClient := MQTT.NewClient(client.ClientOptions)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}

log.Info(logPrefix, "MQTT Connected")

client.Lock()
client.Client = mqttClient

client.Unlock()

return nil
}

//onConnect callback is called on successful connection to broker
func (client *Client) onConnect() MQTT.OnConnectHandler {
log.Info("Running MQTT.OnConnectHandler")
//Adding the subcription if needed for client
return nil
}

//StartMQTTClient is used to initiate the client and set the configuration
func StartMQTTClient(brokerURL string) {

clientConfig, err := NewClient(
SetHost(brokerURL),
SetPort(uint(mqttPort)),
)
if err != nil {
log.Warn(logPrefix, err)
}
clientConfig.ClientOptions.SetOnConnectHandler(clientConfig.onConnect())
URL := clientConfig.SetBrokerURL("tcp")
log.Info(logPrefix, " The broker is", URL)
clientConfig.ClientOptions.AddBroker(URL)

connectErr := clientConfig.Connect()
if connectErr != nil {
log.Warn(logPrefix, connectErr)
}
}

//Publish is used to publish the client data to the cloud
func Publish(client *Client, message Message, topic string) error {

log.Info(logPrefix, "Publishing the data to cloud")
payload, err := json.Marshal(message)
if err != nil {
log.Warn(logPrefix, "Error in Json Marshalling", err)
}
mqttClient := client.Client
for mqttClient == nil {
time.Sleep(time.Second * 2)
}
token := mqttClient.Publish(topic, 0, true, payload)
if token.Wait() && token.Error() != nil {
return token.Error()
}
time.Sleep(time.Second)
return nil
}
32 changes: 32 additions & 0 deletions internal/common/mqtt/mqttconnection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*******************************************************************************
* Copyright 2021 Samsung Electronics All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/
package mqtt
MoonkiHong marked this conversation as resolved.
Show resolved Hide resolved

import (
"testing"
)

//const Host = "broker.emqx.io"
const Host = "ec2-54-175-241-64.compute-1.amazonaws.com"
const port = "1883"

func TestStartMQTTClient(t *testing.T) {
t.Run("Success", func(t *testing.T) {
StartMQTTClient(Host)
})

}
61 changes: 61 additions & 0 deletions internal/controller/cloudsyncmgr/cloudsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*******************************************************************************
* Copyright 2021 Samsung Electronics All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

// Package cloudsyncmgr provides functionalities to handle the cloud synchronization
package cloudsyncmgr
MoonkiHong marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
mqttmgr "github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt"
)

const (
logPrefix = "[cloudsyncmgr]"
)

// CloudSync is the interface for starting Cloud synchronization
type CloudSync interface {
StartCloudSync(host string) error
}

//CloudSyncImpl struct
type CloudSyncImpl struct{}

var (
cloudsyncIns *CloudSyncImpl
log = logmgr.GetInstance()
)

func init() {
cloudsyncIns = &CloudSyncImpl{}

}

// GetInstance returns cloudSync instaance
func GetInstance() CloudSync {
return cloudsyncIns
}

// StartCloudSync starts a server in terms of CloudSync
func (c *CloudSyncImpl) StartCloudSync(host string) (err error) {
if len(host) > 0 {
log.Info("Starting the CLoudsync Mgr")
go mqttmgr.StartMQTTClient(host)
}

return nil
}
Loading