From 29b945cfed3df85c3f93d2ca669e84326aa6cba9 Mon Sep 17 00:00:00 2001 From: Johan Stenstam Date: Fri, 20 Sep 2024 14:48:56 +0200 Subject: [PATCH] * fixed a bug in statusupdater: {EdgeId} wasn't correctly expanded before subscribing to the mqtt topic * fixed a bug in parsesources: the return channel was still left in the old model (with one global channel for all messages) rather that the new model with a dedicated channel per message type. --- sources.go | 3 +-- statusupdater.go | 31 ++++++++++++++++++++++--------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/sources.go b/sources.go index 78ba6a8..432fe60 100644 --- a/sources.go +++ b/sources.go @@ -216,8 +216,7 @@ func (td *TemData) ParseSourcesNG() error { } td.Logger.Printf("ParseSourcesNG: Adding topic '%s' to MQTT Engine", src.Topic) - // err = td.MqttEngine.AddTopic(src.Topic, nil, valkey) - topicdata, err := td.MqttEngine.SubToTopic(src.Topic, valkey, nil, "struct", true) // XXX: Brr. kludge. + topicdata, err := td.MqttEngine.SubToTopic(src.Topic, valkey, td.TapirObservations, "struct", true) // XXX: Brr. kludge. if err != nil { TEMExiter("Error adding topic %s to MQTT Engine: %v", src.Topic, err) } diff --git a/statusupdater.go b/statusupdater.go index b63a40a..2aa7528 100644 --- a/statusupdater.go +++ b/statusupdater.go @@ -28,24 +28,37 @@ func (td *TemData) StatusUpdater(conf *Config, stopch chan struct{}) { // } // Create a new mqtt engine just for the statusupdater. - me, err := tapir.NewMqttEngine("statusupdater", viper.GetString("tapir.mqtt.clientid")+"statusupdates", tapir.TapirPub, td.ComponentStatusCh, log.Default()) - if err != nil { - TEMExiter("StatusUpdater: Error creating MQTT Engine: %v", err) - } - - // var TemStatusCh = make(chan tapir.TemStatusUpdate, 100) - //conf.Internal.TemStatusCh = TemStatusCh + // me, err := tapir.NewMqttEngine("statusupdater", viper.GetString("tapir.mqtt.clientid")+"statusupdates", tapir.TapirPub, td.ComponentStatusCh, log.Default()) + // if err != nil { + // TEMExiter("StatusUpdater: Error creating MQTT Engine: %v", err) + // } + me := td.MqttEngine ticker := time.NewTicker(60 * time.Second) - statusTopic := viper.GetString("tapir.status.topic") - if statusTopic == "" { + // var statusch = make(chan tapir.ComponentStatusUpdate, 10) + // If any status updates arrive, print them out + // go func() { + // for status := range statusch { + // fmt.Printf("Status update: %+v\n", status) + // } + // }() + + certCN, _, _, err := tapir.FetchTapirClientCert(log.Default(), td.ComponentStatusCh) + if err != nil { + TEMExiter("StatusUpdater: Error fetching client certificate: %v", err) + } + + statusTopic, err := tapir.MqttTopic(certCN, "tapir.status.topic") + if err != nil { TEMExiter("StatusUpdater: MQTT status topic not set") } + keyfile := viper.GetString("tapir.status.signingkey") if keyfile == "" { TEMExiter("StatusUpdater: MQTT status signing key not set") } + keyfile = filepath.Clean(keyfile) signkey, err := tapir.FetchMqttSigningKey(statusTopic, keyfile) if err != nil {