Skip to content

Commit

Permalink
Merge pull request #71 from zluudg/leon/globally-update-mqtt-topic
Browse files Browse the repository at this point in the history
Support global config of mqtt-source being pushed from core
  • Loading branch information
johanix authored Nov 12, 2024
2 parents 64b0bd6 + f27cfe9 commit 776286b
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 12 deletions.
7 changes: 4 additions & 3 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/spf13/viper"
)

func (td *PopData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir.WBGlist, error) {
func (td *PopData) BootstrapMqttSource(src SourceConf) (*tapir.WBGlist, error) {
// Initialize the API client
api := &tapir.ApiClient{
BaseUrl: fmt.Sprintf(src.BootstrapUrl, src.Bootstrap[0]), // Must specify a valid BaseUrl
Expand All @@ -30,8 +30,9 @@ func (td *PopData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
POPExiter("BootstrapMqttSource error: missing config key: certs.certdir")
}
// cert := cd + "/" + certname
cert := cd + "/" + "tapir-pop"
tlsConfig, err := tapir.NewClientConfig(viper.GetString("certs.cacertfile"), cert+".key", cert+".crt")
key := viper.GetString("certs.tapir-pop.key")
cert := viper.GetString("certs.tapir-pop.cert")
tlsConfig, err := tapir.NewClientConfig(viper.GetString("certs.cacertfile"), key, cert)
if err != nil {
POPExiter("BootstrapMqttSource: Error: Could not set up TLS: %v", err)
}
Expand Down
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
ApiServer ApiserverConf
DnsEngine DnsengineConf
BootstrapServer BootstrapServerConf
KeyStore KeystoreConf
Sources map[string]SourceConf
Policy PolicyConf
Log struct {
Expand Down Expand Up @@ -77,13 +78,18 @@ type ServerConf struct {
Port string `validate:"required"`
}

type KeystoreConf struct {
Path string `validate:"required,file"`
}

type SourceConf struct {
Active *bool `validate:"required"`
Name string `validate:"required"`
Description string `validate:"required"`
Type string `validate:"required"`
Format string `validate:"required"`
Source string `validate:"required"`
Immutable bool
Topic string
ValidatorKey string
Bootstrap []string
Expand Down
57 changes: 56 additions & 1 deletion configupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,60 @@ func (pd *PopData) ConfigUpdater(conf *Config, stopch chan struct{}) {
}

func (pd *PopData) ProcessTapirGlobalConfig(gconfig tapir.GlobalConfig) {
log.Printf("TapirProcessGlobalConfig: %+v", gconfig)
log.Printf("TapirProcessGlobalConfig: %+v", gconfig)

// Assume there is only one topic and that it is the one we want
// TODO maybe sanitize or sanity check or something
newTopic := gconfig.ObservationTopics[0]

Check failure on line 75 in configupdater.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

gconfig.ObservationTopics undefined (type tapir.GlobalConfig has no field or method ObservationTopics)
bootstrapServers := gconfig.Bootstrap.Servers
bootstrapUrl := gconfig.Bootstrap.BaseUrl
bootstrapKey := gconfig.Bootstrap.ApiToken

Check failure on line 78 in configupdater.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

gconfig.Bootstrap.ApiToken undefined (type struct{Servers []string; BaseUrl string; ApiKey string} has no field or method ApiToken)

//for _, listtype := range []string{"whitelist", "blacklist", "greylist"} {
for _, wbgl := range pd.Lists["greylist"] {
if wbgl.Immutable || wbgl.Datasource != "mqtt" {

Check failure on line 82 in configupdater.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

wbgl.Immutable undefined (type *tapir.WBGlist has no field or method Immutable)
continue
}

for topic := range wbgl.MqttDetails.ValidatorKeys {
pd.MqttEngine.RemoveTopic(topic)
delete(wbgl.MqttDetails.ValidatorKeys, topic)
break // Only one topic
}

valkey := GetValidationKeyByKeyName(newTopic.PubKeyName)
pd.mu.Lock()
wbgl.MqttDetails.ValidatorKeys[newTopic.Topic] = valkey
wbgl.MqttDetails.Bootstrap = bootstrapServers

Check failure on line 95 in configupdater.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

wbgl.MqttDetails.Bootstrap undefined (type *tapir.MqttDetails has no field or method Bootstrap)
wbgl.MqttDetails.BootstrapUrl = bootstrapUrl

Check failure on line 96 in configupdater.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

wbgl.MqttDetails.BootstrapUrl undefined (type *tapir.MqttDetails has no field or method BootstrapUrl)
wbgl.MqttDetails.BootstrapKey = bootstrapKey

Check failure on line 97 in configupdater.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

wbgl.MqttDetails.BootstrapKey undefined (type *tapir.MqttDetails has no field or method BootstrapKey)
pd.mu.Unlock()

_, err := pd.MqttEngine.SubToTopic(newTopic.Topic, valkey, pd.TapirObservations, "struct", true) // XXX: Brr. kludge.
if err != nil {
POPExiter("ProcessTapirGlobalConfig: Error adding topic %s: %v", newTopic, err)
}

src := SourceConf{
Bootstrap: wbgl.MqttDetails.Bootstrap,

Check failure on line 106 in configupdater.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

wbgl.MqttDetails.Bootstrap undefined (type *tapir.MqttDetails has no field or method Bootstrap)
BootstrapUrl: wbgl.MqttDetails.BootstrapUrl,

Check failure on line 107 in configupdater.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

wbgl.MqttDetails.BootstrapUrl undefined (type *tapir.MqttDetails has no field or method BootstrapUrl)
BootstrapKey: wbgl.MqttDetails.BootstrapKey,

Check failure on line 108 in configupdater.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

wbgl.MqttDetails.BootstrapKey undefined (type *tapir.MqttDetails has no field or method BootstrapKey)
Name: wbgl.Name,
Format: wbgl.Format,
}

if len(gconfig.Bootstrap.Servers) > 0 {
pd.Logger.Printf("ProcessTapirGlobalConfig: %d bootstrap servers advertised: %v", wbgl.Name, len(src.Bootstrap), src.Bootstrap)
tmp, err := pd.BootstrapMqttSource(src)
if err != nil {
pd.Logger.Printf("ProcessTapirGlobalConfig: Error bootstrapping MQTT source %s: %v", wbgl.Name, err)
} else {
pd.mu.Lock()
*wbgl = *tmp
pd.mu.Unlock()
}
}

pd.Logger.Printf("*** DONE Processing global config")
}
}
54 changes: 54 additions & 0 deletions keystore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"crypto/ecdsa"
"log"
"os"

"github.com/dnstapir/tapir"
"gopkg.in/yaml.v3"
)

func genKeyStore() map[string]string {
var ks = make(map[string]string)
keyStoreData, err := os.ReadFile(Gconfig.KeyStore.Path)

if err != nil {
log.Fatalf("Error from ReadFile(%s): %v", Gconfig.KeyStore.Path, err)
}

err = yaml.Unmarshal(keyStoreData, &ks)
if err != nil {
log.Fatalf("Error when unmarshaling keystore contents: %v", err)
}

return ks
}

func GetValidationKeyByKeyName(keyname string) *ecdsa.PublicKey {
ks := genKeyStore()


key, err := tapir.FetchMqttValidatorKey("KEYSTORE: "+ keyname, ks[keyname])

if err != nil {
log.Printf("Error getting key %s from keystore", keyname)
return nil
}

return key
}

// Unused, for now
func GetSigningKeyByKeyName(keyname string) *ecdsa.PrivateKey {
ks := genKeyStore()

key, err := tapir.FetchMqttSigningKey("KEYSTORE: "+ keyname, ks[keyname])

if err != nil {
log.Printf("Error getting key %s from keystore", keyname)
return nil
}

return key
}
2 changes: 2 additions & 0 deletions keystore.sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tapir-analyze-pubkey: /etc/dnstapir/certs/tapir-analyze.pub
tapir-mgmt-debug-pubkey: /etc/dnstapir/certs/tapir-analyze.pub
22 changes: 15 additions & 7 deletions pop.globalconfig.sample.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
globalconfig:
tapirconfigversion: 17
rpz:
envelopesize: 400
bootstrap:
servers: [ 77.72.231.135:5454, 1.2.3.4:1234 ]
baseurl: https://%s/api/v1/
apikey: be-lean-and-mean-and-in-between
tapirconfigversion: 17
rpz:
envelopesize: 400
bootstrap:
servers: [ ]
baseurl: https://%s/api/v1/
apitoken: be-lean-and-mean-and-in-between
observationtopics:
- topic: observations/down/tapir-pop
pubkeyname: tapir-analyze-pubkey
- topic: observations/down/tapir-pop/debug
pubkeyname: tapir-mgmt-debug-pubkey
statustopics:
- topic: status/up/axfr/tapir-pop
privkeyname: edge-{EdgeId}-signer-pubkey
12 changes: 11 additions & 1 deletion sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"crypto/ecdsa"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -211,10 +212,19 @@ func (pd *PopData) ParseSourcesNG() error {
}
pd.Logger.Printf("ParseSourcesNG: Topic data for topic %s: %+v", src.Topic, topicdata)

mqttDetails := tapir.MqttDetails{
ValidatorKeys: map[string]*ecdsa.PublicKey{src.Topic: valkey},
Bootstrap: src.Bootstrap,

Check failure on line 217 in sources.go

View workflow job for this annotation

GitHub Actions / build (1.23.1)

unknown field Bootstrap in struct literal of type tapir.MqttDetails
BootstrapUrl: src.BootstrapUrl,
BootstrapKey: src.BootstrapKey,
}
newsource.MqttDetails = &mqttDetails
newsource.Immutable = src.Immutable

newsource.Format = "map" // for now
if len(src.Bootstrap) > 0 {
pd.Logger.Printf("ParseSourcesNG: The %s MQTT source has %d bootstrap servers: %v", src.Name, len(src.Bootstrap), src.Bootstrap)
tmp, err := pd.BootstrapMqttSource(&newsource, src)
tmp, err := pd.BootstrapMqttSource(src)
if err != nil {
pd.Logger.Printf("Error bootstrapping MQTT source %s: %v", src.Name, err)
} else {
Expand Down

0 comments on commit 776286b

Please sign in to comment.