Skip to content

Commit

Permalink
using config file instead of flag (#300)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey authored Nov 30, 2023
1 parent 6587397 commit 97a8a92
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 12 deletions.
87 changes: 76 additions & 11 deletions cloudevents/generic/options/mqtt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/eclipse/paho.golang/packets"
"github.com/eclipse/paho.golang/paho"
"github.com/spf13/pflag"
"gopkg.in/yaml.v2"
)

const (
Expand All @@ -30,6 +30,7 @@ const (
StatusResyncTopic = "sources/+/clusters/statusresync"
)

// MQTTOptions holds the options that are used to build MQTT client.
type MQTTOptions struct {
BrokerHost string
Username string
Expand All @@ -42,6 +43,31 @@ type MQTTOptions struct {
SubQoS int
}

// MQTTConfig holds the information needed to build connect to MQTT broker as a given user.
type MQTTConfig struct {
// BrokerHost is the host of the MQTT broker (hostname:port).
BrokerHost string `json:"brokerHost" yaml:"brokerHost"`

// Username is the username for basic authentication to connect the MQTT broker.
Username string `json:"username,omitempty" yaml:"username,omitempty"`
// Password is the password for basic authentication to connect the MQTT broker.
Password string `json:"password,omitempty" yaml:"password,omitempty"`

// CAFile is the file path to a cert file for the MQTT broker certificate authority.
CAFile string `json:"caFile,omitempty" yaml:"caFile,omitempty"`
// ClientCertFile is the file path to a client cert file for TLS.
ClientCertFile string `json:"clientCertFile,omitempty" yaml:"clientCertFile,omitempty"`
// ClientKeyFile is the file path to a client key file for TLS.
ClientKeyFile string `json:"clientKeyFile,omitempty" yaml:"clientKeyFile,omitempty"`

// KeepAlive is the keep alive time in seconds for MQTT clients, by default is 60s
KeepAlive *uint16 `json:"keepAlive,omitempty" yaml:"keepAlive,omitempty"`
// PubQoS is the QoS for publish, by default is 1
PubQoS *int `json:"pubQoS,omitempty" yaml:"pubQoS,omitempty"`
// SubQoS is the Qos for subscribe, by default is 1
SubQoS *int `json:"subQoS,omitempty" yaml:"subQoS,omitempty"`
}

func NewMQTTOptions() *MQTTOptions {
return &MQTTOptions{
KeepAlive: 60,
Expand All @@ -50,16 +76,55 @@ func NewMQTTOptions() *MQTTOptions {
}
}

func (o *MQTTOptions) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.BrokerHost, "mqtt-broker-host", o.BrokerHost, "The host of MQTT broker")
flags.StringVar(&o.Username, "mqtt-username", o.Username, "The username to connect the MQTT broker")
flags.StringVar(&o.Password, "mqtt-password", o.Password, "The password to connect the MQTT broker")
flags.StringVar(&o.CAFile, "mqtt-broke-ca", o.CAFile, "A file containing trusted CA certificates MQTT broker")
flags.StringVar(&o.ClientCertFile, "mqtt-client-certificate", o.ClientCertFile, "The MQTT client certificate file")
flags.StringVar(&o.ClientKeyFile, "mqtt-client-key", o.ClientKeyFile, "The MQTT client private key file")
flags.Uint16Var(&o.KeepAlive, "mqtt-keep-alive", o.KeepAlive, "Keep alive in seconds for MQTT clients")
flags.IntVar(&o.SubQoS, "mqtt-sub-qos", o.SubQoS, "The OoS for subscribe")
flags.IntVar(&o.PubQoS, "mqtt-pub-qos", o.PubQoS, "The Qos for publish")
// BuildMQTTOptionsFromFlags builds configs from a config filepath.
func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) {
configData, err := os.ReadFile(configPath)
if err != nil {
return nil, err
}

config := &MQTTConfig{}
if err := yaml.Unmarshal(configData, config); err != nil {
return nil, err
}

if config.BrokerHost == "" {
return nil, fmt.Errorf("brokerHost is required")
}

if (config.ClientCertFile == "" && config.ClientKeyFile != "") ||
(config.ClientCertFile != "" && config.ClientKeyFile == "") {
return nil, fmt.Errorf("either both or none of clientCertFile and clientKeyFile must be set")
}
if config.ClientCertFile != "" && config.ClientKeyFile != "" && config.CAFile == "" {
return nil, fmt.Errorf("setting clientCertFile and clientKeyFile requires caFile")
}

options := &MQTTOptions{
BrokerHost: config.BrokerHost,
Username: config.Username,
Password: config.Password,
CAFile: config.CAFile,
ClientCertFile: config.ClientCertFile,
ClientKeyFile: config.ClientKeyFile,
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
}

if config.KeepAlive != nil {
options.KeepAlive = *config.KeepAlive
}

if config.PubQoS != nil {
options.PubQoS = *config.PubQoS
}

if config.SubQoS != nil {
options.SubQoS = *config.SubQoS
}

return options, nil
}

func (o *MQTTOptions) GetNetConn() (net.Conn, error) {
Expand Down
88 changes: 88 additions & 0 deletions cloudevents/generic/options/mqtt/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package mqtt

import (
"log"
"os"
"reflect"
"testing"
)

func TestBuildMQTTOptionsFromFlags(t *testing.T) {
file, err := os.CreateTemp("", "mqtt-config-test-")
if err != nil {
log.Fatal(err)
}
defer os.Remove(file.Name())

cases := []struct {
name string
config string
expectedOptions *MQTTOptions
expectedErrorMsg string
}{
{
name: "empty config",
config: "",
expectedErrorMsg: "brokerHost is required",
},
{
name: "tls config without clientCertFile",
config: "{\"brokerHost\":\"test\",\"clientCertFile\":\"test\"}",
expectedErrorMsg: "either both or none of clientCertFile and clientKeyFile must be set",
},
{
name: "tls config without caFile",
config: "{\"brokerHost\":\"test\",\"clientCertFile\":\"test\",\"clientKeyFile\":\"test\"}",
expectedErrorMsg: "setting clientCertFile and clientKeyFile requires caFile",
},
{
name: "default options",
config: "{\"brokerHost\":\"test\"}",
expectedOptions: &MQTTOptions{
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
},
},
{
name: "default options with yaml format",
config: "brokerHost: test",
expectedOptions: &MQTTOptions{
BrokerHost: "test",
KeepAlive: 60,
PubQoS: 1,
SubQoS: 1,
},
},
{
name: "customized options",
config: "{\"brokerHost\":\"test\",\"keepAlive\":30,\"pubQoS\":0,\"subQoS\":2}",
expectedOptions: &MQTTOptions{
BrokerHost: "test",
KeepAlive: 30,
PubQoS: 0,
SubQoS: 2,
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if err := os.WriteFile(file.Name(), []byte(c.config), 0644); err != nil {
t.Fatal(err)
}

options, err := BuildMQTTOptionsFromFlags(file.Name())
if err != nil {
if err.Error() != c.expectedErrorMsg {
t.Errorf("unexpected err %v", err)
}
}

if !reflect.DeepEqual(options, c.expectedOptions) {
t.Errorf("unexpected options %v", options)
}
})
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/openshift/build-machinery-go v0.0.0-20230306181456-d321ffa04533
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.26.1
k8s.io/apimachinery v0.26.1
k8s.io/client-go v0.26.1
Expand Down Expand Up @@ -76,7 +77,6 @@ require (
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.26.1 // indirect
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect
Expand Down

0 comments on commit 97a8a92

Please sign in to comment.