diff --git a/cloudevents/generic/options/mqtt/agentoptions.go b/cloudevents/generic/options/mqtt/agentoptions.go index 3c2753c8b..adc408afa 100644 --- a/cloudevents/generic/options/mqtt/agentoptions.go +++ b/cloudevents/generic/options/mqtt/agentoptions.go @@ -44,7 +44,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E if eventType.Action == types.ResyncRequestAction { // agent publishes event to spec resync topic to request to get resources spec from all sources - topic := strings.Replace(SpecResyncTopic, "+", o.clusterName, -1) + topic := strings.Replace(o.SpecResyncTopic, "+", o.clusterName, -1) return cloudeventscontext.WithTopic(ctx, topic), nil } @@ -54,7 +54,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E return nil, err } - statusTopic := strings.Replace(StatusTopic, "+", fmt.Sprintf("%s", originalSource), 1) + statusTopic := strings.Replace(o.StatusTopic, "+", fmt.Sprintf("%s", originalSource), 1) statusTopic = strings.Replace(statusTopic, "+", o.clusterName, -1) return cloudeventscontext.WithTopic(ctx, statusTopic), nil } @@ -71,9 +71,9 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ // receiving the resources spec from sources with spec topic - replaceNth(SpecTopic, "+", o.clusterName, 2): {QoS: byte(o.SubQoS)}, + replaceNth(o.SpecTopic, "+", o.clusterName, 2): {QoS: byte(o.SubQoS)}, // receiving the resources status resync request from sources with status resync topic - StatusResyncTopic: {QoS: byte(o.SubQoS)}, + o.StatusResyncTopic: {QoS: byte(o.SubQoS)}, }, }, ), diff --git a/cloudevents/generic/options/mqtt/agentoptions_test.go b/cloudevents/generic/options/mqtt/agentoptions_test.go index 1e47f51c0..c8e94379a 100644 --- a/cloudevents/generic/options/mqtt/agentoptions_test.go +++ b/cloudevents/generic/options/mqtt/agentoptions_test.go @@ -2,6 +2,7 @@ package mqtt import ( "context" + "os" "testing" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -17,6 +18,21 @@ var mockEventDataType = types.CloudEventsDataType{ } func TestAgentContext(t *testing.T) { + file, err := os.CreateTemp("", "mqtt-config-test-") + if err != nil { + t.Fatal(err) + } + defer os.Remove(file.Name()) + + if err := os.WriteFile(file.Name(), []byte("{\"brokerHost\":\"test\"}"), 0644); err != nil { + t.Fatal(err) + } + + options, err := BuildMQTTOptionsFromFlags(file.Name()) + if err != nil { + t.Fatal(err) + } + cases := []struct { name string event cloudevents.Event @@ -103,7 +119,10 @@ func TestAgentContext(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - agentOptions := &mqttAgentOptions{clusterName: "cluster1"} + agentOptions := &mqttAgentOptions{ + MQTTOptions: *options, + clusterName: "cluster1", + } ctx, err := agentOptions.WithContext(context.TODO(), c.event.Context) c.assertError(err) diff --git a/cloudevents/generic/options/mqtt/options.go b/cloudevents/generic/options/mqtt/options.go index da9a56c61..7081edd40 100644 --- a/cloudevents/generic/options/mqtt/options.go +++ b/cloudevents/generic/options/mqtt/options.go @@ -17,30 +17,34 @@ import ( ) const ( - // SpecTopic is a MQTT topic for resource spec. - SpecTopic = "sources/+/clusters/+/spec" + // defaultSpecTopic is a default MQTT topic for resource spec. + defaultSpecTopic = "sources/+/clusters/+/spec" - // StatusTopic is a MQTT topic for resource status. - StatusTopic = "sources/+/clusters/+/status" + // defaultStatusTopic is a default MQTT topic for resource status. + defaultStatusTopic = "sources/+/clusters/+/status" - // SpecResyncTopic is a MQTT topic for resource spec resync. - SpecResyncTopic = "sources/clusters/+/specresync" + // defaultSpecResyncTopic is a default MQTT topic for resource spec resync. + defaultSpecResyncTopic = "sources/clusters/+/specresync" - // StatusResyncTopic is a MQTT topic for resource status resync. - StatusResyncTopic = "sources/+/clusters/statusresync" + // defaultStatusResyncTopic is a default MQTT topic for resource status resync. + defaultStatusResyncTopic = "sources/+/clusters/statusresync" ) // MQTTOptions holds the options that are used to build MQTT client. type MQTTOptions struct { - BrokerHost string - Username string - Password string - CAFile string - ClientCertFile string - ClientKeyFile string - KeepAlive uint16 - PubQoS int - SubQoS int + BrokerHost string + Username string + Password string + CAFile string + ClientCertFile string + ClientKeyFile string + SpecTopic string + StatusTopic string + SpecResyncTopic string + StatusResyncTopic string + KeepAlive uint16 + PubQoS int + SubQoS int } // MQTTConfig holds the information needed to build connect to MQTT broker as a given user. @@ -62,17 +66,31 @@ type MQTTConfig struct { // KeepAlive is the keep alive time in seconds for MQTT clients, by default is 60s KeepAlive *uint16 `json:"keepAlive,omitempty" yaml:"keepAlive,omitempty"` + // PubQoS is the QoS for publish, by default is 1 PubQoS *int `json:"pubQoS,omitempty" yaml:"pubQoS,omitempty"` // SubQoS is the Qos for subscribe, by default is 1 SubQoS *int `json:"subQoS,omitempty" yaml:"subQoS,omitempty"` + + // SpecTopic is a MQTT topic for resource spec, by default is sources/+/clusters/+/spec. + SpecTopic string `json:"specTopic,omitempty" yaml:"specTopic,omitempty"` + // StatusTopic is a MQTT topic for resource status, by default is sources/+/clusters/+/status. + StatusTopic string `json:"statusTopic,omitempty" yaml:"statusTopic,omitempty"` + // SpecResyncTopic is a MQTT topic for resource spec resync, by default is sources/clusters/+/specresync. + SpecResyncTopic string `json:"specResyncTopic,omitempty" yaml:"specResyncTopic,omitempty"` + // StatusResyncTopic is a MQTT topic for resource status resync, by default is sources/+/clusters/statusresync. + StatusResyncTopic string `json:"statusResyncTopic,omitempty" yaml:"statusResyncTopic,omitempty"` } func NewMQTTOptions() *MQTTOptions { return &MQTTOptions{ - KeepAlive: 60, - PubQoS: 1, - SubQoS: 1, + SpecTopic: defaultSpecTopic, + StatusTopic: defaultStatusTopic, + SpecResyncTopic: defaultSpecResyncTopic, + StatusResyncTopic: defaultStatusResyncTopic, + KeepAlive: 60, + PubQoS: 1, + SubQoS: 1, } } @@ -101,15 +119,19 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) { } options := &MQTTOptions{ - BrokerHost: config.BrokerHost, - Username: config.Username, - Password: config.Password, - CAFile: config.CAFile, - ClientCertFile: config.ClientCertFile, - ClientKeyFile: config.ClientKeyFile, - KeepAlive: 60, - PubQoS: 1, - SubQoS: 1, + BrokerHost: config.BrokerHost, + Username: config.Username, + Password: config.Password, + CAFile: config.CAFile, + ClientCertFile: config.ClientCertFile, + ClientKeyFile: config.ClientKeyFile, + SpecTopic: defaultSpecTopic, + StatusTopic: defaultStatusTopic, + SpecResyncTopic: defaultSpecResyncTopic, + StatusResyncTopic: defaultStatusResyncTopic, + KeepAlive: 60, + PubQoS: 1, + SubQoS: 1, } if config.KeepAlive != nil { @@ -124,6 +146,22 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) { options.SubQoS = *config.SubQoS } + if config.SpecTopic != "" { + options.SpecTopic = config.SpecTopic + } + + if config.StatusTopic != "" { + options.StatusTopic = config.StatusTopic + } + + if config.SpecResyncTopic != "" { + options.SpecResyncTopic = config.SpecResyncTopic + } + + if config.StatusResyncTopic != "" { + options.SpecResyncTopic = config.StatusResyncTopic + } + return options, nil } diff --git a/cloudevents/generic/options/mqtt/options_test.go b/cloudevents/generic/options/mqtt/options_test.go index 2817c8530..7e6f3c279 100644 --- a/cloudevents/generic/options/mqtt/options_test.go +++ b/cloudevents/generic/options/mqtt/options_test.go @@ -1,7 +1,6 @@ package mqtt import ( - "log" "os" "reflect" "testing" @@ -10,7 +9,7 @@ import ( func TestBuildMQTTOptionsFromFlags(t *testing.T) { file, err := os.CreateTemp("", "mqtt-config-test-") if err != nil { - log.Fatal(err) + t.Fatal(err) } defer os.Remove(file.Name()) @@ -39,30 +38,42 @@ func TestBuildMQTTOptionsFromFlags(t *testing.T) { name: "default options", config: "{\"brokerHost\":\"test\"}", expectedOptions: &MQTTOptions{ - BrokerHost: "test", - KeepAlive: 60, - PubQoS: 1, - SubQoS: 1, + BrokerHost: "test", + KeepAlive: 60, + PubQoS: 1, + SubQoS: 1, + SpecTopic: "sources/+/clusters/+/spec", + StatusTopic: "sources/+/clusters/+/status", + SpecResyncTopic: "sources/clusters/+/specresync", + StatusResyncTopic: "sources/+/clusters/statusresync", }, }, { name: "default options with yaml format", config: "brokerHost: test", expectedOptions: &MQTTOptions{ - BrokerHost: "test", - KeepAlive: 60, - PubQoS: 1, - SubQoS: 1, + BrokerHost: "test", + KeepAlive: 60, + PubQoS: 1, + SubQoS: 1, + SpecTopic: "sources/+/clusters/+/spec", + StatusTopic: "sources/+/clusters/+/status", + SpecResyncTopic: "sources/clusters/+/specresync", + StatusResyncTopic: "sources/+/clusters/statusresync", }, }, { name: "customized options", config: "{\"brokerHost\":\"test\",\"keepAlive\":30,\"pubQoS\":0,\"subQoS\":2}", expectedOptions: &MQTTOptions{ - BrokerHost: "test", - KeepAlive: 30, - PubQoS: 0, - SubQoS: 2, + BrokerHost: "test", + KeepAlive: 30, + PubQoS: 0, + SubQoS: 2, + SpecTopic: "sources/+/clusters/+/spec", + StatusTopic: "sources/+/clusters/+/status", + SpecResyncTopic: "sources/clusters/+/specresync", + StatusResyncTopic: "sources/+/clusters/statusresync", }, }, } diff --git a/cloudevents/generic/options/mqtt/sourceoptions.go b/cloudevents/generic/options/mqtt/sourceoptions.go index 4b1d7a6ee..eef5a4a60 100644 --- a/cloudevents/generic/options/mqtt/sourceoptions.go +++ b/cloudevents/generic/options/mqtt/sourceoptions.go @@ -43,7 +43,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. if eventType.Action == types.ResyncRequestAction { // source publishes event to status resync topic to request to get resources status from all clusters - return cloudeventscontext.WithTopic(ctx, strings.Replace(StatusResyncTopic, "+", o.sourceID, -1)), nil + return cloudeventscontext.WithTopic(ctx, strings.Replace(o.StatusResyncTopic, "+", o.sourceID, -1)), nil } clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName) @@ -52,7 +52,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. } // source publishes event to spec topic to send the resource spec to a specified cluster - specTopic := strings.Replace(SpecTopic, "+", o.sourceID, 1) + specTopic := strings.Replace(o.SpecTopic, "+", o.sourceID, 1) specTopic = strings.Replace(specTopic, "+", fmt.Sprintf("%s", clusterName), -1) return cloudeventscontext.WithTopic(ctx, specTopic), nil } @@ -69,9 +69,9 @@ func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, err &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ // receiving the resources status from agents with status topic - strings.Replace(StatusTopic, "+", o.sourceID, 1): {QoS: byte(o.SubQoS)}, + strings.Replace(o.StatusTopic, "+", o.sourceID, 1): {QoS: byte(o.SubQoS)}, // receiving the resources spec resync request from agents with spec resync topic - SpecResyncTopic: {QoS: byte(o.SubQoS)}, + o.SpecResyncTopic: {QoS: byte(o.SubQoS)}, }, }, ), diff --git a/cloudevents/generic/options/mqtt/sourceoptions_test.go b/cloudevents/generic/options/mqtt/sourceoptions_test.go index 69bc8fdc7..70d4d543b 100644 --- a/cloudevents/generic/options/mqtt/sourceoptions_test.go +++ b/cloudevents/generic/options/mqtt/sourceoptions_test.go @@ -2,6 +2,7 @@ package mqtt import ( "context" + "os" "testing" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -11,6 +12,21 @@ import ( ) func TestSourceContext(t *testing.T) { + file, err := os.CreateTemp("", "mqtt-config-test-") + if err != nil { + t.Fatal(err) + } + defer os.Remove(file.Name()) + + if err := os.WriteFile(file.Name(), []byte("{\"brokerHost\":\"test\"}"), 0644); err != nil { + t.Fatal(err) + } + + options, err := BuildMQTTOptionsFromFlags(file.Name()) + if err != nil { + t.Fatal(err) + } + cases := []struct { name string event cloudevents.Event @@ -94,7 +110,10 @@ func TestSourceContext(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - sourceOptions := &mqttSourceOptions{sourceID: "hub1"} + sourceOptions := &mqttSourceOptions{ + MQTTOptions: *options, + sourceID: "hub1", + } ctx, err := sourceOptions.WithContext(context.TODO(), c.event.Context) c.assertError(err)