Skip to content

Commit

Permalink
[CloudSync] Adding the API to be exposed to thirdparty for Data Publish
Browse files Browse the repository at this point in the history
Signed-off-by: Nitu Gupta <[email protected]>
  • Loading branch information
nitu-s-gupta committed Dec 20, 2021
1 parent a521772 commit 2ef35ea
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ RUN_OPTIONS += -e SECURE=true
endif

ifeq ($CONFIG_CLOUD_SYNC),y)
RUN_OPTIONS += -e CLOUD_SYNC=""
RUN_OPTIONS += -e CLOUD_SYNC=true
endif

# Go parameters
Expand Down
18 changes: 16 additions & 2 deletions internal/common/mqtt/mqttconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,23 @@ type Message struct {
Payload string
}

var (
mqttClient *Client
)

// Config represents an attribute config setter for the `Client`.
type Config func(*Client)

//SetClient sets the client
func SetClient(client *Client) {
mqttClient = client
}

//GetClient gets the client set
func GetClient() *Client {
return mqttClient
}

// SetClientID sets the mqtt client id.
func SetClientID(id string) Config {
return func(c *Client) {
Expand Down Expand Up @@ -78,20 +92,20 @@ func (c *Client) SetBrokerURL(protocol string) string {
// NewClient returns a configured `Client`. Is mandatory
func NewClient(configs ...Config) (*Client, error) {
client := &Client{
Qos: byte(0),
Qos: byte(1),
}

for _, config := range configs {
config(client)
}

copts := MQTT.NewClientOptions()
copts.SetClientID(clientID)
copts.SetAutoReconnect(true)
copts.SetMaxReconnectInterval(1 * time.Second)
copts.SetOnConnectHandler(client.onConnect())
copts.SetConnectionLostHandler(func(c MQTT.Client, err error) {
log.Warn(logPrefix, " disconnected, reason: "+err.Error())
mqttClient.Connect()
})

client.ClientOptions = copts
Expand Down
22 changes: 21 additions & 1 deletion internal/common/mqtt/mqttconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ func (client *Client) onConnect() MQTT.OnConnectHandler {
return nil
}

// IsConnected checks if the client is connected or not
func (client *Client) IsConnected() bool {
if client.Client == nil {
return false
}

return client.Client.IsConnected()
}

// Disconnect disconnects the connection
func (client *Client) Disconnect(quiesce uint) {
if client.Client != nil {
client.Client.Disconnect(quiesce)
}
}

//StartMQTTClient is used to initiate the client and set the configuration
func StartMQTTClient(brokerURL string) {

Expand All @@ -75,15 +91,19 @@ func StartMQTTClient(brokerURL string) {
if connectErr != nil {
log.Warn(logPrefix, connectErr)
}

SetClient(clientConfig)

}

//Publish is used to publish the client data to the cloud
func Publish(client *Client, message Message, topic string) error {
func (client *Client) Publish(message Message, topic string) error {

log.Info(logPrefix, "Publishing the data to cloud")
payload, err := json.Marshal(message)
if err != nil {
log.Warn(logPrefix, "Error in Json Marshalling", err)
return err
}
mqttClient := client.Client
for mqttClient == nil {
Expand Down
50 changes: 45 additions & 5 deletions internal/controller/cloudsyncmgr/cloudsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package cloudsyncmgr

import (
"fmt"
"strings"

"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
mqttmgr "github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt"
)
Expand All @@ -29,15 +32,20 @@ const (

// CloudSync is the interface for starting Cloud synchronization
type CloudSync interface {
InitiateCloudSync(isCloudSet string) error
StartCloudSync(host string) error
//implemented by external REST API
RequestCloudSyncConf(message mqttmgr.Message, topic string, clientID string) string
}

//CloudSyncImpl struct
type CloudSyncImpl struct{}

var (
cloudsyncIns *CloudSyncImpl
log = logmgr.GetInstance()
cloudsyncIns *CloudSyncImpl
log = logmgr.GetInstance()
mqttClient *mqttmgr.Client
isCloudSyncSet bool
)

func init() {
Expand All @@ -50,12 +58,44 @@ func GetInstance() CloudSync {
return cloudsyncIns
}

// StartCloudSync starts a server in terms of CloudSync
// InitiateCloudSync initiate CloudSync
func (c *CloudSyncImpl) InitiateCloudSync(isCloudSet string) (err error) {
isCloudSyncSet = false
if len(isCloudSet) > 0 {
if strings.Compare(strings.ToLower(isCloudSet), "true") == 0 {
log.Println("CloudSync init set")
isCloudSyncSet = true
}
}
return nil
}

//StartCloudSync is used to start the sync by connecting to the broker
func (c *CloudSyncImpl) StartCloudSync(host string) (err error) {
if len(host) > 0 {
if isCloudSyncSet && len(host) > 0 {
log.Info("Starting the CLoudsync Mgr")
go mqttmgr.StartMQTTClient(host)
}
return
}

return nil
// RequestCloudSyncConf is configuration request handler
func (c *CloudSyncImpl) RequestCloudSyncConf(message mqttmgr.Message, topic string, clientID string) string {
log.Info(logPrefix, "Publishing the data to the cloud")
mqttClient = mqttmgr.GetClient()
mqttmgr.SetClientID(clientID)
resp := ""
if mqttClient.IsConnected() {
err := mqttClient.Publish(message, topic)
if err != nil {
errMsg := fmt.Sprintf("Error in publishing the data %s", err)
resp = errMsg
} else {
resp = "Data published successfully to Cloud"
}
} else {
resp = "Client not connected to Broker URL"
}

return resp
}
25 changes: 25 additions & 0 deletions internal/controller/cloudsyncmgr/mocks/mocks_cloudsync.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion internal/orchestrationapi/mocks/mock_orchestration.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion internal/orchestrationapi/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr"

Expand Down Expand Up @@ -54,6 +55,7 @@ type Orche interface {
type OrcheExternalAPI interface {
RequestService(serviceInfo ReqeustService) ResponseService
verifier.Conf
RequestCloudSync(message mqtt.Message, topic string, clientID string) string
}

// OrcheInternalAPI is the interface implemented by internal REST API
Expand Down Expand Up @@ -215,7 +217,8 @@ func (o *orcheImpl) Start(deviceIDPath string, platform string, executionType st
resourceMonitorImpl.StartMonitoringResource()
o.discoverIns.StartDiscovery(deviceIDPath, platform, executionType)
o.storageIns.StartStorage("")
o.cloudsyncIns.StartCloudSync(os.Getenv("CLOUD_SYNC"))
cloudSyncState := os.Getenv("CLOUD_SYNC")
o.cloudsyncIns.InitiateCloudSync(cloudSyncState)
o.watcher.Watch(o)
o.Ready = true
time.Sleep(1000)
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrationapi/orchestration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestStart(t *testing.T) {
mockResourceutil.EXPECT().StartMonitoringResource(),
mockDiscovery.EXPECT().StartDiscovery(gomock.Eq(deviceIDPath), gomock.Eq(platform), gomock.Eq(executionType)),
mockStorage.EXPECT().StartStorage(gomock.Any()),
mockCloudSync.EXPECT().StartCloudSync(gomock.Any()),
mockCloudSync.EXPECT().InitiateCloudSync(gomock.Any()),
mockWatcher.EXPECT().Watch(gomock.Any()),
)

Expand Down
7 changes: 7 additions & 0 deletions internal/orchestrationapi/orchestrationapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/lf-edge/edge-home-orchestration-go/internal/common/commandvalidator"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/networkhelper"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/requestervalidator"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr"
Expand Down Expand Up @@ -128,6 +129,12 @@ func init() {
helper = dbhelper.GetInstance()
}

//RequestCloudSync handles the request for cloud syncing
func (orcheEngine *orcheImpl) RequestCloudSync(message mqtt.Message, topic string, clientID string) string {
log.Info("[RequestCloudSync]", "Requesting cloud sync")
return orcheEngine.cloudsyncIns.RequestCloudSyncConf(message, topic, clientID)
}

// RequestService handles service request (ex. offloading) from service application
func (orcheEngine *orcheImpl) RequestService(serviceInfo ReqeustService) ResponseService {
log.Printf("[RequestService] %v: %v\n", serviceInfo.ServiceName, serviceInfo.ServiceInfo)
Expand Down
Loading

0 comments on commit 2ef35ea

Please sign in to comment.