Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 expose mqtt topics #312

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cloudevents/generic/options/mqtt/agentoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E

if eventType.Action == types.ResyncRequestAction {
// agent publishes event to spec resync topic to request to get resources spec from all sources
topic := strings.Replace(SpecResyncTopic, "+", o.clusterName, -1)
topic := strings.Replace(o.SpecResyncTopic, "+", o.clusterName, -1)
return cloudeventscontext.WithTopic(ctx, topic), nil
}

Expand All @@ -54,7 +54,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E
return nil, err
}

statusTopic := strings.Replace(StatusTopic, "+", fmt.Sprintf("%s", originalSource), 1)
statusTopic := strings.Replace(o.StatusTopic, "+", fmt.Sprintf("%s", originalSource), 1)
statusTopic = strings.Replace(statusTopic, "+", o.clusterName, -1)
return cloudeventscontext.WithTopic(ctx, statusTopic), nil
}
Expand All @@ -71,9 +71,9 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro
&paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
// receiving the resources spec from sources with spec topic
replaceNth(SpecTopic, "+", o.clusterName, 2): {QoS: byte(o.SubQoS)},
replaceNth(o.SpecTopic, "+", o.clusterName, 2): {QoS: byte(o.SubQoS)},
// receiving the resources status resync request from sources with status resync topic
StatusResyncTopic: {QoS: byte(o.SubQoS)},
o.StatusResyncTopic: {QoS: byte(o.SubQoS)},
},
},
),
Expand Down
21 changes: 20 additions & 1 deletion cloudevents/generic/options/mqtt/agentoptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mqtt

import (
"context"
"os"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand All @@ -17,6 +18,21 @@ var mockEventDataType = types.CloudEventsDataType{
}

func TestAgentContext(t *testing.T) {
file, err := os.CreateTemp("", "mqtt-config-test-")
if err != nil {
t.Fatal(err)
}
defer os.Remove(file.Name())

if err := os.WriteFile(file.Name(), []byte("{\"brokerHost\":\"test\"}"), 0644); err != nil {
t.Fatal(err)
}

options, err := BuildMQTTOptionsFromFlags(file.Name())
if err != nil {
t.Fatal(err)
}

cases := []struct {
name string
event cloudevents.Event
Expand Down Expand Up @@ -103,7 +119,10 @@ func TestAgentContext(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
agentOptions := &mqttAgentOptions{clusterName: "cluster1"}
agentOptions := &mqttAgentOptions{
MQTTOptions: *options,
clusterName: "cluster1",
}
ctx, err := agentOptions.WithContext(context.TODO(), c.event.Context)
c.assertError(err)

Expand Down
96 changes: 67 additions & 29 deletions cloudevents/generic/options/mqtt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,34 @@ import (
)

const (
// SpecTopic is a MQTT topic for resource spec.
SpecTopic = "sources/+/clusters/+/spec"
// defaultSpecTopic is a default MQTT topic for resource spec.
defaultSpecTopic = "sources/+/clusters/+/spec"

// StatusTopic is a MQTT topic for resource status.
StatusTopic = "sources/+/clusters/+/status"
// defaultStatusTopic is a default MQTT topic for resource status.
defaultStatusTopic = "sources/+/clusters/+/status"

// SpecResyncTopic is a MQTT topic for resource spec resync.
SpecResyncTopic = "sources/clusters/+/specresync"
// defaultSpecResyncTopic is a default MQTT topic for resource spec resync.
defaultSpecResyncTopic = "sources/clusters/+/specresync"

// StatusResyncTopic is a MQTT topic for resource status resync.
StatusResyncTopic = "sources/+/clusters/statusresync"
// defaultStatusResyncTopic is a default MQTT topic for resource status resync.
defaultStatusResyncTopic = "sources/+/clusters/statusresync"
)

// MQTTOptions holds the options that are used to build MQTT client.
type MQTTOptions struct {
BrokerHost string
Username string
Password string
CAFile string
ClientCertFile string
ClientKeyFile string
KeepAlive uint16
PubQoS int
SubQoS int
BrokerHost string
Username string
Password string
CAFile string
ClientCertFile string
ClientKeyFile string
SpecTopic string
StatusTopic string
SpecResyncTopic string
StatusResyncTopic string
KeepAlive uint16
PubQoS int
SubQoS int
}

// MQTTConfig holds the information needed to build connect to MQTT broker as a given user.
Expand All @@ -62,17 +66,31 @@ type MQTTConfig struct {

// KeepAlive is the keep alive time in seconds for MQTT clients, by default is 60s
KeepAlive *uint16 `json:"keepAlive,omitempty" yaml:"keepAlive,omitempty"`

// PubQoS is the QoS for publish, by default is 1
PubQoS *int `json:"pubQoS,omitempty" yaml:"pubQoS,omitempty"`
// SubQoS is the Qos for subscribe, by default is 1
SubQoS *int `json:"subQoS,omitempty" yaml:"subQoS,omitempty"`

// SpecTopic is a MQTT topic for resource spec, by default is sources/+/clusters/+/spec.
SpecTopic string `json:"specTopic,omitempty" yaml:"specTopic,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about a topics struct and status/spect/statusResync in it. I feel like the struct will be reused by other messaging system

// StatusTopic is a MQTT topic for resource status, by default is sources/+/clusters/+/status.
StatusTopic string `json:"statusTopic,omitempty" yaml:"statusTopic,omitempty"`
// SpecResyncTopic is a MQTT topic for resource spec resync, by default is sources/clusters/+/specresync.
SpecResyncTopic string `json:"specResyncTopic,omitempty" yaml:"specResyncTopic,omitempty"`
// StatusResyncTopic is a MQTT topic for resource status resync, by default is sources/+/clusters/statusresync.
StatusResyncTopic string `json:"statusResyncTopic,omitempty" yaml:"statusResyncTopic,omitempty"`
}

func NewMQTTOptions() *MQTTOptions {
return &MQTTOptions{
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
SpecTopic: defaultSpecTopic,
StatusTopic: defaultStatusTopic,
SpecResyncTopic: defaultSpecResyncTopic,
StatusResyncTopic: defaultStatusResyncTopic,
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
}
}

Expand Down Expand Up @@ -101,15 +119,19 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) {
}

options := &MQTTOptions{
BrokerHost: config.BrokerHost,
Username: config.Username,
Password: config.Password,
CAFile: config.CAFile,
ClientCertFile: config.ClientCertFile,
ClientKeyFile: config.ClientKeyFile,
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
BrokerHost: config.BrokerHost,
Username: config.Username,
Password: config.Password,
CAFile: config.CAFile,
ClientCertFile: config.ClientCertFile,
ClientKeyFile: config.ClientKeyFile,
SpecTopic: defaultSpecTopic,
StatusTopic: defaultStatusTopic,
SpecResyncTopic: defaultSpecResyncTopic,
StatusResyncTopic: defaultStatusResyncTopic,
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
}

if config.KeepAlive != nil {
Expand All @@ -124,6 +146,22 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) {
options.SubQoS = *config.SubQoS
}

if config.SpecTopic != "" {
options.SpecTopic = config.SpecTopic
}

if config.StatusTopic != "" {
options.StatusTopic = config.StatusTopic
}

if config.SpecResyncTopic != "" {
options.SpecResyncTopic = config.SpecResyncTopic
}

if config.StatusResyncTopic != "" {
options.SpecResyncTopic = config.StatusResyncTopic
}

return options, nil
}

Expand Down
39 changes: 25 additions & 14 deletions cloudevents/generic/options/mqtt/options_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mqtt

import (
"log"
"os"
"reflect"
"testing"
Expand All @@ -10,7 +9,7 @@ import (
func TestBuildMQTTOptionsFromFlags(t *testing.T) {
file, err := os.CreateTemp("", "mqtt-config-test-")
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
defer os.Remove(file.Name())

Expand Down Expand Up @@ -39,30 +38,42 @@ func TestBuildMQTTOptionsFromFlags(t *testing.T) {
name: "default options",
config: "{\"brokerHost\":\"test\"}",
expectedOptions: &MQTTOptions{
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
SpecTopic: "sources/+/clusters/+/spec",
StatusTopic: "sources/+/clusters/+/status",
SpecResyncTopic: "sources/clusters/+/specresync",
StatusResyncTopic: "sources/+/clusters/statusresync",
},
},
{
name: "default options with yaml format",
config: "brokerHost: test",
expectedOptions: &MQTTOptions{
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
SpecTopic: "sources/+/clusters/+/spec",
StatusTopic: "sources/+/clusters/+/status",
SpecResyncTopic: "sources/clusters/+/specresync",
StatusResyncTopic: "sources/+/clusters/statusresync",
},
},
{
name: "customized options",
config: "{\"brokerHost\":\"test\",\"keepAlive\":30,\"pubQoS\":0,\"subQoS\":2}",
expectedOptions: &MQTTOptions{
BrokerHost: "test",
KeepAlive: 30,
PubQoS: 0,
SubQoS: 2,
BrokerHost: "test",
KeepAlive: 30,
PubQoS: 0,
SubQoS: 2,
SpecTopic: "sources/+/clusters/+/spec",
StatusTopic: "sources/+/clusters/+/status",
SpecResyncTopic: "sources/clusters/+/specresync",
StatusResyncTopic: "sources/+/clusters/statusresync",
},
},
}
Expand Down
8 changes: 4 additions & 4 deletions cloudevents/generic/options/mqtt/sourceoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.

if eventType.Action == types.ResyncRequestAction {
// source publishes event to status resync topic to request to get resources status from all clusters
return cloudeventscontext.WithTopic(ctx, strings.Replace(StatusResyncTopic, "+", o.sourceID, -1)), nil
return cloudeventscontext.WithTopic(ctx, strings.Replace(o.StatusResyncTopic, "+", o.sourceID, -1)), nil
}

clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
Expand All @@ -52,7 +52,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.
}

// source publishes event to spec topic to send the resource spec to a specified cluster
specTopic := strings.Replace(SpecTopic, "+", o.sourceID, 1)
specTopic := strings.Replace(o.SpecTopic, "+", o.sourceID, 1)
specTopic = strings.Replace(specTopic, "+", fmt.Sprintf("%s", clusterName), -1)
return cloudeventscontext.WithTopic(ctx, specTopic), nil
}
Expand All @@ -69,9 +69,9 @@ func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, err
&paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
// receiving the resources status from agents with status topic
strings.Replace(StatusTopic, "+", o.sourceID, 1): {QoS: byte(o.SubQoS)},
strings.Replace(o.StatusTopic, "+", o.sourceID, 1): {QoS: byte(o.SubQoS)},
// receiving the resources spec resync request from agents with spec resync topic
SpecResyncTopic: {QoS: byte(o.SubQoS)},
o.SpecResyncTopic: {QoS: byte(o.SubQoS)},
},
},
),
Expand Down
21 changes: 20 additions & 1 deletion cloudevents/generic/options/mqtt/sourceoptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mqtt

import (
"context"
"os"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand All @@ -11,6 +12,21 @@ import (
)

func TestSourceContext(t *testing.T) {
file, err := os.CreateTemp("", "mqtt-config-test-")
if err != nil {
t.Fatal(err)
}
defer os.Remove(file.Name())

if err := os.WriteFile(file.Name(), []byte("{\"brokerHost\":\"test\"}"), 0644); err != nil {
t.Fatal(err)
}

options, err := BuildMQTTOptionsFromFlags(file.Name())
if err != nil {
t.Fatal(err)
}

cases := []struct {
name string
event cloudevents.Event
Expand Down Expand Up @@ -94,7 +110,10 @@ func TestSourceContext(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
sourceOptions := &mqttSourceOptions{sourceID: "hub1"}
sourceOptions := &mqttSourceOptions{
MQTTOptions: *options,
sourceID: "hub1",
}
ctx, err := sourceOptions.WithContext(context.TODO(), c.event.Context)
c.assertError(err)

Expand Down
Loading