Skip to content

Commit

Permalink
expose mqtt topics
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jan 10, 2024
1 parent f6404f3 commit 2cc525a
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 53 deletions.
8 changes: 4 additions & 4 deletions cloudevents/generic/options/mqtt/agentoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E

if eventType.Action == types.ResyncRequestAction {
// agent publishes event to spec resync topic to request to get resources spec from all sources
topic := strings.Replace(SpecResyncTopic, "+", o.clusterName, -1)
topic := strings.Replace(o.SpecResyncTopic, "+", o.clusterName, -1)
return cloudeventscontext.WithTopic(ctx, topic), nil
}

Expand All @@ -54,7 +54,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E
return nil, err
}

statusTopic := strings.Replace(StatusTopic, "+", fmt.Sprintf("%s", originalSource), 1)
statusTopic := strings.Replace(o.StatusTopic, "+", fmt.Sprintf("%s", originalSource), 1)
statusTopic = strings.Replace(statusTopic, "+", o.clusterName, -1)
return cloudeventscontext.WithTopic(ctx, statusTopic), nil
}
Expand All @@ -71,9 +71,9 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro
&paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
// receiving the resources spec from sources with spec topic
replaceNth(SpecTopic, "+", o.clusterName, 2): {QoS: byte(o.SubQoS)},
replaceNth(o.SpecTopic, "+", o.clusterName, 2): {QoS: byte(o.SubQoS)},
// receiving the resources status resync request from sources with status resync topic
StatusResyncTopic: {QoS: byte(o.SubQoS)},
o.StatusResyncTopic: {QoS: byte(o.SubQoS)},
},
},
),
Expand Down
21 changes: 20 additions & 1 deletion cloudevents/generic/options/mqtt/agentoptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mqtt

import (
"context"
"os"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand All @@ -17,6 +18,21 @@ var mockEventDataType = types.CloudEventsDataType{
}

func TestAgentContext(t *testing.T) {
file, err := os.CreateTemp("", "mqtt-config-test-")
if err != nil {
t.Fatal(err)
}
defer os.Remove(file.Name())

if err := os.WriteFile(file.Name(), []byte("{\"brokerHost\":\"test\"}"), 0644); err != nil {
t.Fatal(err)
}

options, err := BuildMQTTOptionsFromFlags(file.Name())
if err != nil {
t.Fatal(err)
}

cases := []struct {
name string
event cloudevents.Event
Expand Down Expand Up @@ -103,7 +119,10 @@ func TestAgentContext(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
agentOptions := &mqttAgentOptions{clusterName: "cluster1"}
agentOptions := &mqttAgentOptions{
MQTTOptions: *options,
clusterName: "cluster1",
}
ctx, err := agentOptions.WithContext(context.TODO(), c.event.Context)
c.assertError(err)

Expand Down
96 changes: 67 additions & 29 deletions cloudevents/generic/options/mqtt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,34 @@ import (
)

const (
// SpecTopic is a MQTT topic for resource spec.
SpecTopic = "sources/+/clusters/+/spec"
// defaultSpecTopic is a default MQTT topic for resource spec.
defaultSpecTopic = "sources/+/clusters/+/spec"

// StatusTopic is a MQTT topic for resource status.
StatusTopic = "sources/+/clusters/+/status"
// defaultStatusTopic is a default MQTT topic for resource status.
defaultStatusTopic = "sources/+/clusters/+/status"

// SpecResyncTopic is a MQTT topic for resource spec resync.
SpecResyncTopic = "sources/clusters/+/specresync"
// defaultSpecResyncTopic is a default MQTT topic for resource spec resync.
defaultSpecResyncTopic = "sources/clusters/+/specresync"

// StatusResyncTopic is a MQTT topic for resource status resync.
StatusResyncTopic = "sources/+/clusters/statusresync"
// defaultStatusResyncTopic is a default MQTT topic for resource status resync.
defaultStatusResyncTopic = "sources/+/clusters/statusresync"
)

// MQTTOptions holds the options that are used to build MQTT client.
type MQTTOptions struct {
BrokerHost string
Username string
Password string
CAFile string
ClientCertFile string
ClientKeyFile string
KeepAlive uint16
PubQoS int
SubQoS int
BrokerHost string
Username string
Password string
CAFile string
ClientCertFile string
ClientKeyFile string
SpecTopic string
StatusTopic string
SpecResyncTopic string
StatusResyncTopic string
KeepAlive uint16
PubQoS int
SubQoS int
}

// MQTTConfig holds the information needed to build connect to MQTT broker as a given user.
Expand All @@ -62,17 +66,31 @@ type MQTTConfig struct {

// KeepAlive is the keep alive time in seconds for MQTT clients, by default is 60s
KeepAlive *uint16 `json:"keepAlive,omitempty" yaml:"keepAlive,omitempty"`

// PubQoS is the QoS for publish, by default is 1
PubQoS *int `json:"pubQoS,omitempty" yaml:"pubQoS,omitempty"`
// SubQoS is the Qos for subscribe, by default is 1
SubQoS *int `json:"subQoS,omitempty" yaml:"subQoS,omitempty"`

// SpecTopic is a MQTT topic for resource spec, by default is sources/+/clusters/+/spec.
SpecTopic string `json:"specTopic,omitempty" yaml:"specTopic,omitempty"`
// StatusTopic is a MQTT topic for resource status, by default is sources/+/clusters/+/status.
StatusTopic string `json:"statusTopic,omitempty" yaml:"statusTopic,omitempty"`
// SpecResyncTopic is a MQTT topic for resource spec resync, by default is sources/clusters/+/specresync.
SpecResyncTopic string `json:"specResyncTopic,omitempty" yaml:"specResyncTopic,omitempty"`
// StatusResyncTopic is a MQTT topic for resource status resync, by default is sources/+/clusters/statusresync.
StatusResyncTopic string `json:"statusResyncTopic,omitempty" yaml:"statusResyncTopic,omitempty"`
}

func NewMQTTOptions() *MQTTOptions {
return &MQTTOptions{
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
SpecTopic: defaultSpecTopic,
StatusTopic: defaultStatusTopic,
SpecResyncTopic: defaultSpecResyncTopic,
StatusResyncTopic: defaultStatusResyncTopic,
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
}
}

Expand Down Expand Up @@ -101,15 +119,19 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) {
}

options := &MQTTOptions{
BrokerHost: config.BrokerHost,
Username: config.Username,
Password: config.Password,
CAFile: config.CAFile,
ClientCertFile: config.ClientCertFile,
ClientKeyFile: config.ClientKeyFile,
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
BrokerHost: config.BrokerHost,
Username: config.Username,
Password: config.Password,
CAFile: config.CAFile,
ClientCertFile: config.ClientCertFile,
ClientKeyFile: config.ClientKeyFile,
SpecTopic: defaultSpecTopic,
StatusTopic: defaultStatusTopic,
SpecResyncTopic: defaultSpecResyncTopic,
StatusResyncTopic: defaultStatusResyncTopic,
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
}

if config.KeepAlive != nil {
Expand All @@ -124,6 +146,22 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) {
options.SubQoS = *config.SubQoS
}

if config.SpecTopic != "" {
options.SpecTopic = config.SpecTopic
}

if config.StatusTopic != "" {
options.StatusTopic = config.StatusTopic
}

if config.SpecResyncTopic != "" {
options.SpecResyncTopic = config.SpecResyncTopic
}

if config.StatusResyncTopic != "" {
options.SpecResyncTopic = config.StatusResyncTopic
}

return options, nil
}

Expand Down
39 changes: 25 additions & 14 deletions cloudevents/generic/options/mqtt/options_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mqtt

import (
"log"
"os"
"reflect"
"testing"
Expand All @@ -10,7 +9,7 @@ import (
func TestBuildMQTTOptionsFromFlags(t *testing.T) {
file, err := os.CreateTemp("", "mqtt-config-test-")
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
defer os.Remove(file.Name())

Expand Down Expand Up @@ -39,30 +38,42 @@ func TestBuildMQTTOptionsFromFlags(t *testing.T) {
name: "default options",
config: "{\"brokerHost\":\"test\"}",
expectedOptions: &MQTTOptions{
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
SpecTopic: "sources/+/clusters/+/spec",
StatusTopic: "sources/+/clusters/+/status",
SpecResyncTopic: "sources/clusters/+/specresync",
StatusResyncTopic: "sources/+/clusters/statusresync",
},
},
{
name: "default options with yaml format",
config: "brokerHost: test",
expectedOptions: &MQTTOptions{
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
SpecTopic: "sources/+/clusters/+/spec",
StatusTopic: "sources/+/clusters/+/status",
SpecResyncTopic: "sources/clusters/+/specresync",
StatusResyncTopic: "sources/+/clusters/statusresync",
},
},
{
name: "customized options",
config: "{\"brokerHost\":\"test\",\"keepAlive\":30,\"pubQoS\":0,\"subQoS\":2}",
expectedOptions: &MQTTOptions{
BrokerHost: "test",
KeepAlive: 30,
PubQoS: 0,
SubQoS: 2,
BrokerHost: "test",
KeepAlive: 30,
PubQoS: 0,
SubQoS: 2,
SpecTopic: "sources/+/clusters/+/spec",
StatusTopic: "sources/+/clusters/+/status",
SpecResyncTopic: "sources/clusters/+/specresync",
StatusResyncTopic: "sources/+/clusters/statusresync",
},
},
}
Expand Down
8 changes: 4 additions & 4 deletions cloudevents/generic/options/mqtt/sourceoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.

if eventType.Action == types.ResyncRequestAction {
// source publishes event to status resync topic to request to get resources status from all clusters
return cloudeventscontext.WithTopic(ctx, strings.Replace(StatusResyncTopic, "+", o.sourceID, -1)), nil
return cloudeventscontext.WithTopic(ctx, strings.Replace(o.StatusResyncTopic, "+", o.sourceID, -1)), nil
}

clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
Expand All @@ -52,7 +52,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.
}

// source publishes event to spec topic to send the resource spec to a specified cluster
specTopic := strings.Replace(SpecTopic, "+", o.sourceID, 1)
specTopic := strings.Replace(o.SpecTopic, "+", o.sourceID, 1)
specTopic = strings.Replace(specTopic, "+", fmt.Sprintf("%s", clusterName), -1)
return cloudeventscontext.WithTopic(ctx, specTopic), nil
}
Expand All @@ -69,9 +69,9 @@ func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, err
&paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
// receiving the resources status from agents with status topic
strings.Replace(StatusTopic, "+", o.sourceID, 1): {QoS: byte(o.SubQoS)},
strings.Replace(o.StatusTopic, "+", o.sourceID, 1): {QoS: byte(o.SubQoS)},
// receiving the resources spec resync request from agents with spec resync topic
SpecResyncTopic: {QoS: byte(o.SubQoS)},
o.SpecResyncTopic: {QoS: byte(o.SubQoS)},
},
},
),
Expand Down
21 changes: 20 additions & 1 deletion cloudevents/generic/options/mqtt/sourceoptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mqtt

import (
"context"
"os"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand All @@ -11,6 +12,21 @@ import (
)

func TestSourceContext(t *testing.T) {
file, err := os.CreateTemp("", "mqtt-config-test-")
if err != nil {
t.Fatal(err)
}
defer os.Remove(file.Name())

if err := os.WriteFile(file.Name(), []byte("{\"brokerHost\":\"test\"}"), 0644); err != nil {
t.Fatal(err)
}

options, err := BuildMQTTOptionsFromFlags(file.Name())
if err != nil {
t.Fatal(err)
}

cases := []struct {
name string
event cloudevents.Event
Expand Down Expand Up @@ -94,7 +110,10 @@ func TestSourceContext(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
sourceOptions := &mqttSourceOptions{sourceID: "hub1"}
sourceOptions := &mqttSourceOptions{
MQTTOptions: *options,
sourceID: "hub1",
}
ctx, err := sourceOptions.WithContext(context.TODO(), c.event.Context)
c.assertError(err)

Expand Down

0 comments on commit 2cc525a

Please sign in to comment.