Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CloudSync] Adding the API to be exposed to thirdparty for Data Publish #446

Merged
merged 1 commit into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ RUN_OPTIONS += -e SECURE=true
endif

ifeq ($CONFIG_CLOUD_SYNC),y)
RUN_OPTIONS += -e CLOUD_SYNC=""
RUN_OPTIONS += -e CLOUD_SYNC=true
endif

# Go parameters
Expand Down
18 changes: 16 additions & 2 deletions internal/common/mqtt/mqttconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,23 @@ type Message struct {
Payload string
}

var (
mqttClient *Client
)

// Config represents an attribute config setter for the `Client`.
type Config func(*Client)

//SetClient sets the client
func SetClient(client *Client) {
mqttClient = client
}

//GetClient gets the client set
func GetClient() *Client {
return mqttClient
}

// SetClientID sets the mqtt client id.
func SetClientID(id string) Config {
return func(c *Client) {
Expand Down Expand Up @@ -78,20 +92,20 @@ func (c *Client) SetBrokerURL(protocol string) string {
// NewClient returns a configured `Client`. Is mandatory
func NewClient(configs ...Config) (*Client, error) {
client := &Client{
Qos: byte(0),
Qos: byte(1),
}

for _, config := range configs {
config(client)
}

copts := MQTT.NewClientOptions()
copts.SetClientID(clientID)
copts.SetAutoReconnect(true)
copts.SetMaxReconnectInterval(1 * time.Second)
copts.SetOnConnectHandler(client.onConnect())
copts.SetConnectionLostHandler(func(c MQTT.Client, err error) {
log.Warn(logPrefix, " disconnected, reason: "+err.Error())
mqttClient.Connect()
})

client.ClientOptions = copts
Expand Down
22 changes: 21 additions & 1 deletion internal/common/mqtt/mqttconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ func (client *Client) onConnect() MQTT.OnConnectHandler {
return nil
}

// IsConnected checks if the client is connected or not
func (client *Client) IsConnected() bool {
if client.Client == nil {
return false
}

return client.Client.IsConnected()
}

// Disconnect disconnects the connection
func (client *Client) Disconnect(quiesce uint) {
if client.Client != nil {
client.Client.Disconnect(quiesce)
}
}

//StartMQTTClient is used to initiate the client and set the configuration
func StartMQTTClient(brokerURL string) {

Expand All @@ -75,15 +91,19 @@ func StartMQTTClient(brokerURL string) {
if connectErr != nil {
log.Warn(logPrefix, connectErr)
}

SetClient(clientConfig)

}

//Publish is used to publish the client data to the cloud
func Publish(client *Client, message Message, topic string) error {
func (client *Client) Publish(message Message, topic string) error {

log.Info(logPrefix, "Publishing the data to cloud")
payload, err := json.Marshal(message)
if err != nil {
log.Warn(logPrefix, "Error in Json Marshalling", err)
return err
}
mqttClient := client.Client
for mqttClient == nil {
Expand Down
50 changes: 45 additions & 5 deletions internal/controller/cloudsyncmgr/cloudsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package cloudsyncmgr

import (
"fmt"
"strings"

"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
mqttmgr "github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt"
)
Expand All @@ -29,15 +32,20 @@ const (

// CloudSync is the interface for starting Cloud synchronization
type CloudSync interface {
InitiateCloudSync(isCloudSet string) error
StartCloudSync(host string) error
//implemented by external REST API
RequestCloudSyncConf(message mqttmgr.Message, topic string, clientID string) string
}

//CloudSyncImpl struct
type CloudSyncImpl struct{}

var (
cloudsyncIns *CloudSyncImpl
log = logmgr.GetInstance()
cloudsyncIns *CloudSyncImpl
log = logmgr.GetInstance()
mqttClient *mqttmgr.Client
isCloudSyncSet bool
)

func init() {
Expand All @@ -50,12 +58,44 @@ func GetInstance() CloudSync {
return cloudsyncIns
}

// StartCloudSync starts a server in terms of CloudSync
// InitiateCloudSync initiate CloudSync
func (c *CloudSyncImpl) InitiateCloudSync(isCloudSet string) (err error) {
isCloudSyncSet = false
if len(isCloudSet) > 0 {
if strings.Compare(strings.ToLower(isCloudSet), "true") == 0 {
log.Println("CloudSync init set")
isCloudSyncSet = true
}
}
return nil
}

//StartCloudSync is used to start the sync by connecting to the broker
func (c *CloudSyncImpl) StartCloudSync(host string) (err error) {
if len(host) > 0 {
if isCloudSyncSet && len(host) > 0 {
log.Info("Starting the CLoudsync Mgr")
go mqttmgr.StartMQTTClient(host)
}
return
}

return nil
// RequestCloudSyncConf is configuration request handler
func (c *CloudSyncImpl) RequestCloudSyncConf(message mqttmgr.Message, topic string, clientID string) string {
log.Info(logPrefix, "Publishing the data to the cloud")
mqttClient = mqttmgr.GetClient()
mqttmgr.SetClientID(clientID)
resp := ""
if mqttClient.IsConnected() {
err := mqttClient.Publish(message, topic)
if err != nil {
errMsg := fmt.Sprintf("Error in publishing the data %s", err)
resp = errMsg
} else {
resp = "Data published successfully to Cloud"
}
} else {
resp = "Client not connected to Broker URL"
}

return resp
}
25 changes: 25 additions & 0 deletions internal/controller/cloudsyncmgr/mocks/mocks_cloudsync.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion internal/orchestrationapi/mocks/mock_orchestration.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion internal/orchestrationapi/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/lf-edge/edge-home-orchestration-go/internal/common/logmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/storagemgr"

Expand Down Expand Up @@ -54,6 +55,7 @@ type Orche interface {
type OrcheExternalAPI interface {
RequestService(serviceInfo ReqeustService) ResponseService
verifier.Conf
RequestCloudSync(message mqtt.Message, topic string, clientID string) string
}

// OrcheInternalAPI is the interface implemented by internal REST API
Expand Down Expand Up @@ -215,7 +217,8 @@ func (o *orcheImpl) Start(deviceIDPath string, platform string, executionType st
resourceMonitorImpl.StartMonitoringResource()
o.discoverIns.StartDiscovery(deviceIDPath, platform, executionType)
o.storageIns.StartStorage("")
o.cloudsyncIns.StartCloudSync(os.Getenv("CLOUD_SYNC"))
cloudSyncState := os.Getenv("CLOUD_SYNC")
o.cloudsyncIns.InitiateCloudSync(cloudSyncState)
o.watcher.Watch(o)
o.Ready = true
time.Sleep(1000)
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrationapi/orchestration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestStart(t *testing.T) {
mockResourceutil.EXPECT().StartMonitoringResource(),
mockDiscovery.EXPECT().StartDiscovery(gomock.Eq(deviceIDPath), gomock.Eq(platform), gomock.Eq(executionType)),
mockStorage.EXPECT().StartStorage(gomock.Any()),
mockCloudSync.EXPECT().StartCloudSync(gomock.Any()),
mockCloudSync.EXPECT().InitiateCloudSync(gomock.Any()),
mockWatcher.EXPECT().Watch(gomock.Any()),
)

Expand Down
7 changes: 7 additions & 0 deletions internal/orchestrationapi/orchestrationapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/lf-edge/edge-home-orchestration-go/internal/common/commandvalidator"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/mqtt"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/networkhelper"
"github.com/lf-edge/edge-home-orchestration-go/internal/common/requestervalidator"
"github.com/lf-edge/edge-home-orchestration-go/internal/controller/cloudsyncmgr"
Expand Down Expand Up @@ -128,6 +129,12 @@ func init() {
helper = dbhelper.GetInstance()
}

//RequestCloudSync handles the request for cloud syncing
func (orcheEngine *orcheImpl) RequestCloudSync(message mqtt.Message, topic string, clientID string) string {
log.Info("[RequestCloudSync]", "Requesting cloud sync")
return orcheEngine.cloudsyncIns.RequestCloudSyncConf(message, topic, clientID)
}

// RequestService handles service request (ex. offloading) from service application
func (orcheEngine *orcheImpl) RequestService(serviceInfo ReqeustService) ResponseService {
log.Printf("[RequestService] %v: %v\n", serviceInfo.ServiceName, serviceInfo.ServiceInfo)
Expand Down
Loading