diff --git a/cloudevents/generic/options/mqtt/options.go b/cloudevents/generic/options/mqtt/options.go index 89992b00c..da9a56c61 100644 --- a/cloudevents/generic/options/mqtt/options.go +++ b/cloudevents/generic/options/mqtt/options.go @@ -13,7 +13,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" - "github.com/spf13/pflag" + "gopkg.in/yaml.v2" ) const ( @@ -30,6 +30,7 @@ const ( StatusResyncTopic = "sources/+/clusters/statusresync" ) +// MQTTOptions holds the options that are used to build MQTT client. type MQTTOptions struct { BrokerHost string Username string @@ -42,6 +43,31 @@ type MQTTOptions struct { SubQoS int } +// MQTTConfig holds the information needed to build connect to MQTT broker as a given user. +type MQTTConfig struct { + // BrokerHost is the host of the MQTT broker (hostname:port). + BrokerHost string `json:"brokerHost" yaml:"brokerHost"` + + // Username is the username for basic authentication to connect the MQTT broker. + Username string `json:"username,omitempty" yaml:"username,omitempty"` + // Password is the password for basic authentication to connect the MQTT broker. + Password string `json:"password,omitempty" yaml:"password,omitempty"` + + // CAFile is the file path to a cert file for the MQTT broker certificate authority. + CAFile string `json:"caFile,omitempty" yaml:"caFile,omitempty"` + // ClientCertFile is the file path to a client cert file for TLS. + ClientCertFile string `json:"clientCertFile,omitempty" yaml:"clientCertFile,omitempty"` + // ClientKeyFile is the file path to a client key file for TLS. + ClientKeyFile string `json:"clientKeyFile,omitempty" yaml:"clientKeyFile,omitempty"` + + // KeepAlive is the keep alive time in seconds for MQTT clients, by default is 60s + KeepAlive *uint16 `json:"keepAlive,omitempty" yaml:"keepAlive,omitempty"` + // PubQoS is the QoS for publish, by default is 1 + PubQoS *int `json:"pubQoS,omitempty" yaml:"pubQoS,omitempty"` + // SubQoS is the Qos for subscribe, by default is 1 + SubQoS *int `json:"subQoS,omitempty" yaml:"subQoS,omitempty"` +} + func NewMQTTOptions() *MQTTOptions { return &MQTTOptions{ KeepAlive: 60, @@ -50,16 +76,55 @@ func NewMQTTOptions() *MQTTOptions { } } -func (o *MQTTOptions) AddFlags(flags *pflag.FlagSet) { - flags.StringVar(&o.BrokerHost, "mqtt-broker-host", o.BrokerHost, "The host of MQTT broker") - flags.StringVar(&o.Username, "mqtt-username", o.Username, "The username to connect the MQTT broker") - flags.StringVar(&o.Password, "mqtt-password", o.Password, "The password to connect the MQTT broker") - flags.StringVar(&o.CAFile, "mqtt-broke-ca", o.CAFile, "A file containing trusted CA certificates MQTT broker") - flags.StringVar(&o.ClientCertFile, "mqtt-client-certificate", o.ClientCertFile, "The MQTT client certificate file") - flags.StringVar(&o.ClientKeyFile, "mqtt-client-key", o.ClientKeyFile, "The MQTT client private key file") - flags.Uint16Var(&o.KeepAlive, "mqtt-keep-alive", o.KeepAlive, "Keep alive in seconds for MQTT clients") - flags.IntVar(&o.SubQoS, "mqtt-sub-qos", o.SubQoS, "The OoS for subscribe") - flags.IntVar(&o.PubQoS, "mqtt-pub-qos", o.PubQoS, "The Qos for publish") +// BuildMQTTOptionsFromFlags builds configs from a config filepath. +func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) { + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, err + } + + config := &MQTTConfig{} + if err := yaml.Unmarshal(configData, config); err != nil { + return nil, err + } + + if config.BrokerHost == "" { + return nil, fmt.Errorf("brokerHost is required") + } + + if (config.ClientCertFile == "" && config.ClientKeyFile != "") || + (config.ClientCertFile != "" && config.ClientKeyFile == "") { + return nil, fmt.Errorf("either both or none of clientCertFile and clientKeyFile must be set") + } + if config.ClientCertFile != "" && config.ClientKeyFile != "" && config.CAFile == "" { + return nil, fmt.Errorf("setting clientCertFile and clientKeyFile requires caFile") + } + + options := &MQTTOptions{ + BrokerHost: config.BrokerHost, + Username: config.Username, + Password: config.Password, + CAFile: config.CAFile, + ClientCertFile: config.ClientCertFile, + ClientKeyFile: config.ClientKeyFile, + KeepAlive: 60, + PubQoS: 1, + SubQoS: 1, + } + + if config.KeepAlive != nil { + options.KeepAlive = *config.KeepAlive + } + + if config.PubQoS != nil { + options.PubQoS = *config.PubQoS + } + + if config.SubQoS != nil { + options.SubQoS = *config.SubQoS + } + + return options, nil } func (o *MQTTOptions) GetNetConn() (net.Conn, error) { diff --git a/cloudevents/generic/options/mqtt/options_test.go b/cloudevents/generic/options/mqtt/options_test.go new file mode 100644 index 000000000..2817c8530 --- /dev/null +++ b/cloudevents/generic/options/mqtt/options_test.go @@ -0,0 +1,88 @@ +package mqtt + +import ( + "log" + "os" + "reflect" + "testing" +) + +func TestBuildMQTTOptionsFromFlags(t *testing.T) { + file, err := os.CreateTemp("", "mqtt-config-test-") + if err != nil { + log.Fatal(err) + } + defer os.Remove(file.Name()) + + cases := []struct { + name string + config string + expectedOptions *MQTTOptions + expectedErrorMsg string + }{ + { + name: "empty config", + config: "", + expectedErrorMsg: "brokerHost is required", + }, + { + name: "tls config without clientCertFile", + config: "{\"brokerHost\":\"test\",\"clientCertFile\":\"test\"}", + expectedErrorMsg: "either both or none of clientCertFile and clientKeyFile must be set", + }, + { + name: "tls config without caFile", + config: "{\"brokerHost\":\"test\",\"clientCertFile\":\"test\",\"clientKeyFile\":\"test\"}", + expectedErrorMsg: "setting clientCertFile and clientKeyFile requires caFile", + }, + { + name: "default options", + config: "{\"brokerHost\":\"test\"}", + expectedOptions: &MQTTOptions{ + BrokerHost: "test", + KeepAlive: 60, + PubQoS: 1, + SubQoS: 1, + }, + }, + { + name: "default options with yaml format", + config: "brokerHost: test", + expectedOptions: &MQTTOptions{ + BrokerHost: "test", + KeepAlive: 60, + PubQoS: 1, + SubQoS: 1, + }, + }, + { + name: "customized options", + config: "{\"brokerHost\":\"test\",\"keepAlive\":30,\"pubQoS\":0,\"subQoS\":2}", + expectedOptions: &MQTTOptions{ + BrokerHost: "test", + KeepAlive: 30, + PubQoS: 0, + SubQoS: 2, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if err := os.WriteFile(file.Name(), []byte(c.config), 0644); err != nil { + t.Fatal(err) + } + + options, err := BuildMQTTOptionsFromFlags(file.Name()) + if err != nil { + if err.Error() != c.expectedErrorMsg { + t.Errorf("unexpected err %v", err) + } + } + + if !reflect.DeepEqual(options, c.expectedOptions) { + t.Errorf("unexpected options %v", options) + } + }) + } +} diff --git a/go.mod b/go.mod index b79ad3d8f..dc09448b9 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/openshift/build-machinery-go v0.0.0-20230306181456-d321ffa04533 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.1 + gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.26.1 k8s.io/apimachinery v0.26.1 k8s.io/client-go v0.26.1 @@ -76,7 +77,6 @@ require ( google.golang.org/protobuf v1.28.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.26.1 // indirect k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect