From 74e48b04236804a18a2ec7555291ab233a2c6e87 Mon Sep 17 00:00:00 2001 From: Victor Antonovich Date: Fri, 15 Jul 2022 12:23:20 +0300 Subject: [PATCH] Add HTTP station data publisher in ESPEasy JSON format --- .gitignore | 1 + esp.go | 132 ++++++++++++++++++++++++++++++++++----------------- main.go | 10 +++- publisher.go | 99 ++++++++++++++++++++++++++++++++++++++ station.go | 25 ++++++---- 5 files changed, 215 insertions(+), 52 deletions(-) create mode 100644 publisher.go diff --git a/.gitignore b/.gitignore index b80ee64..49b43e1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Folders _test/ bin/ +.vscode/ # IDE files *.iml diff --git a/esp.go b/esp.go index 3540143..a176c83 100644 --- a/esp.go +++ b/esp.go @@ -15,57 +15,60 @@ package main import ( + "time" + "github.com/openairtech/api" ) type EspData struct { - System EspSystem `json:"System"` - WiFi EspWiFi `json:"WiFi"` - Sensors []EspSensors `json:"Sensors"` - TTL int `json:"TTL"` + System *EspSystem `json:"System,omitempty"` + WiFi *EspWiFi `json:"WiFi,omitempty"` + Sensors []EspSensors `json:"Sensors,omitempty"` + TTL int `json:"TTL,omitempty"` } type EspSystem struct { - Build int `json:"Build"` - GitBuild string `json:"Git Build"` - SystemLibraries string `json:"System libraries"` - Plugins int `json:"Plugins"` - PluginDescription string `json:"Plugin description"` - LocalTime string `json:"Local time"` - Unit int `json:"Unit"` - Name string `json:"Name"` - Uptime int `json:"Uptime"` - LastBootCause string `json:"Last boot cause"` - ResetReason string `json:"Reset Reason"` - Load float32 `json:"Load"` - LoadLC int `json:"Load LC"` - FreeRAM int `json:"Free RAM"` + Build int `json:"Build,omitempty"` + GitBuild string `json:"Git Build,omitempty"` + SystemLibraries string `json:"System libraries,omitempty"` + Plugins int `json:"Plugins,omitempty"` + PluginDescription string `json:"Plugin description,omitempty"` + LocalTime string `json:"Local time,omitempty"` + Unit int `json:"Unit,omitempty"` + Name string `json:"Name,omitempty"` + UnitName string `json:"Unit Name,omitempty"` + Uptime int `json:"Uptime,omitempty"` + LastBootCause string `json:"Last boot cause,omitempty"` + ResetReason string `json:"Reset Reason,omitempty"` + Load float32 `json:"Load,omitempty"` + LoadLC int `json:"Load LC,omitempty"` + FreeRAM int `json:"Free RAM,omitempty"` } type EspWiFi struct { - Hostname string `json:"Hostname"` - IPConfig string `json:"IP config"` - IP string `json:"IP"` - SubnetMask string `json:"Subnet Mask"` - GatewayIP string `json:"Gateway IP"` - MACAddress string `json:"MAC address"` // mega-20190301 - StationMAC string `json:"STA MAC"` // mega-20190903 - DNS1 string `json:"DNS 1"` - DNS2 string `json:"DNS 2"` - SSID string `json:"SSID"` - BSSID string `json:"BSSID"` - Channel int `json:"Channel"` - ConnectedMsec int `json:"Connected msec"` - LastDisconnectReason int `json:"Last Disconnect Reason"` - LastDisconnectReasonStr string `json:"Last Disconnect Reason str"` - NumberReconnects int `json:"Number reconnects"` - RSSI int `json:"RSSI"` + Hostname string `json:"Hostname,omitempty"` + IPConfig string `json:"IP config,omitempty"` + IP string `json:"IP,omitempty"` + SubnetMask string `json:"Subnet Mask,omitempty"` + GatewayIP string `json:"Gateway IP,omitempty"` + MACAddress string `json:"MAC address"` // mega-20190301 + StationMAC string `json:"STA MAC,omitempty"` // mega-20190903 + DNS1 string `json:"DNS 1,omitempty"` + DNS2 string `json:"DNS 2,omitempty"` + SSID string `json:"SSID,omitempty"` + BSSID string `json:"BSSID,omitempty"` + Channel int `json:"Channel,omitempty"` + ConnectedMsec int `json:"Connected msec,omitempty"` + LastDisconnectReason int `json:"Last Disconnect Reason,omitempty"` + LastDisconnectReasonStr string `json:"Last Disconnect Reason str,omitempty"` + NumberReconnects int `json:"Number reconnects,omitempty"` + RSSI int `json:"RSSI,omitempty"` } type EspTaskValues struct { - ValueNumber int `json:"ValueNumber"` + ValueNumber int `json:"ValueNumber,omitempty"` Name string `json:"Name"` - NrDecimals int `json:"NrDecimals"` + NrDecimals int `json:"NrDecimals,omitempty"` Value float32 `json:"Value"` } @@ -76,13 +79,13 @@ type EspDataAcquisition struct { } type EspSensors struct { - TaskValues []EspTaskValues `json:"TaskValues"` - DataAcquisition []EspDataAcquisition `json:"DataAcquisition"` - TaskInterval int `json:"TaskInterval"` - Type string `json:"Type"` + TaskValues []EspTaskValues `json:"TaskValues,omitempty"` + DataAcquisition []EspDataAcquisition `json:"DataAcquisition,omitempty"` + TaskInterval int `json:"TaskInterval,omitempty"` + Type string `json:"Type,omitempty"` TaskName string `json:"TaskName"` - TaskEnabled bool `json:"TaskEnabled,string"` - TaskNumber int `json:"TaskNumber"` + TaskEnabled bool `json:"TaskEnabled,string,omitempty"` + TaskNumber int `json:"TaskNumber,omitempty"` } type EspGpioControlResponse struct { @@ -93,6 +96,49 @@ type EspGpioControlResponse struct { State int `json:"state"` } +func NewEspData(m *api.Measurement, uptime time.Duration, name string) *EspData { + bmeSensor := EspSensors{ + TaskName: "BME280", + TaskValues: []EspTaskValues{ + { + Name: "Temperature", + Value: *m.Temperature, + }, + { + Name: "Humidity", + Value: *m.Humidity, + }, + { + Name: "Pressure", + Value: *m.Pressure, + }, + }, + } + sdsSensor := EspSensors{ + TaskName: "SDS011", + TaskValues: []EspTaskValues{ + { + Name: "PM2.5", + Value: *m.Pm25, + }, + { + Name: "PM10", + Value: *m.Pm10, + }, + }, + } + return &EspData{ + System: &EspSystem{ + UnitName: name, + Uptime: int(uptime.Minutes()), + }, + Sensors: []EspSensors{ + bmeSensor, + sdsSensor, + }, + } +} + func (ed *EspData) Measurement(t api.UnixTime) *api.Measurement { m := api.Measurement{ Timestamp: &t, diff --git a/main.go b/main.go index 6f365ea..7fb21b1 100644 --- a/main.go +++ b/main.go @@ -94,6 +94,8 @@ func main() { disabledFeeders := stringArray{} flag.Var(&disabledFeeders, "D", fmt.Sprintf("disable feeder (%s)", fnl)) + httpPublisherPort := flag.Int("J", 0, "sensor data HTTP publisher port (0 to disable HTTP publisher)") + flag.Parse() if *versionFlag { @@ -172,6 +174,12 @@ func main() { log.Debugf("enabled feeders: [%s]", SliceToString(efn)) + var ps []Publisher + + if *httpPublisherPort > 0 { + ps = append(ps, NewHttpPublisher(*httpPublisherPort)) + } + var station Station if *mode == StationModeEsp { station = NewEspStation(version, *espHost, *espPort, *espHeaterGpioPin, *stationTokenId) @@ -183,7 +191,7 @@ func main() { } } - RunStation(ctx, station, ef, *updateInterval, *settleTime, *disablePmCorrectionFlag, + RunStation(ctx, station, ef, ps, *updateInterval, *settleTime, *disablePmCorrectionFlag, *enableHeaterFlag, *heaterTurnOnHumidity) log.Printf("exiting...") diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..773e052 --- /dev/null +++ b/publisher.go @@ -0,0 +1,99 @@ +// Copyright © 2022 Victor Antonovich +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +type Publisher interface { + Start() error + Stop() + Publish(data *StationData) +} + +type HttpPublisher struct { + sync.Mutex + + port int + + server *http.Server + serverStopWg *sync.WaitGroup + + lastData *StationData +} + +func NewHttpPublisher(port int) *HttpPublisher { + return &HttpPublisher{ + port: port, + } +} + +func (hp *HttpPublisher) Start() error { + log.Printf("starting sensor data HTTP publisher at http://0.0.0.0:%d/json", hp.port) + mux := http.NewServeMux() + mux.HandleFunc("/json", func(w http.ResponseWriter, r *http.Request) { + if hp.lastData == nil { + w.WriteHeader(503) + return + } + ld := hp.lastData + ep := NewEspData(ld.LastMeasurement, ld.Uptime, ld.Version) + jd, err := json.Marshal(ep) + if err != nil { + w.WriteHeader(500) + w.Write([]byte(err.Error())) + return + } + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + w.Write(jd) + }) + hp.server = &http.Server{Addr: fmt.Sprintf(":%d", hp.port), Handler: mux} + hp.serverStopWg = &sync.WaitGroup{} + hp.serverStopWg.Add(1) + go func() { + defer hp.serverStopWg.Done() + if err := hp.server.ListenAndServe(); err != http.ErrServerClosed { + log.Errorf("can't start sensor data HTTP publisher: %v", err) + } + }() + return nil +} + +func (hp *HttpPublisher) Stop() { + log.Print("stopping sensor data HTTP publisher...") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := hp.server.Shutdown(ctx); err != nil { + log.Errorf("error while stopping sensor data HTTP server: %v", err) + } + hp.serverStopWg.Wait() + log.Print("sensor data HTTP publisher stopped") +} + +func (hp *HttpPublisher) Publish(data *StationData) { + lastData := *data + hp.Lock() + defer hp.Unlock() + hp.lastData = &lastData +} diff --git a/station.go b/station.go index ea0e2b2..a94d8d7 100644 --- a/station.go +++ b/station.go @@ -394,10 +394,15 @@ func (rs *RpiStation) GetData() (*StationData, error) { }, nil } -func RunStation(ctx context.Context, station Station, feeders []Feeder, updateInterval time.Duration, - settleTime time.Duration, disablePmCorrection, enableHeater bool, heaterTurnOnHumidity int) { +func RunStation(ctx context.Context, station Station, feeders []Feeder, publishers []Publisher, + updateInterval time.Duration, settleTime time.Duration, disablePmCorrection, enableHeater bool, + heaterTurnOnHumidity int) { p := time.Duration(0) + for _, publisher := range publishers { + publisher.Start() + } + if err := station.Start(); err != nil { log.Errorf("can't start station: %v", err) return @@ -411,6 +416,10 @@ func RunStation(ctx context.Context, station Station, feeders []Feeder, updateIn defer station.Stop() + for _, publisher := range publishers { + defer publisher.Stop() + } + for { select { case <-time.After(p): @@ -443,17 +452,13 @@ func RunStation(ctx context.Context, station Station, feeders []Feeder, updateIn Float32RefToString(m.Temperature), Float32RefToString(m.Humidity), Float32RefToString(m.Pressure), Float32RefToString(m.Pm25), Float32RefToString(m.Pm10)) - if len(feeders) == 0 { - continue - } - if time.Now().Before(time.Unix(systemEpoch, 0)) { - log.Info("skipping station data feeding since station system time probably is not in sync") + log.Info("ignoring station data since station system time probably is not in sync") continue } if data.Uptime < settleTime { - log.Infof("skipping station data feeding since station uptime (%+v) is "+ + log.Infof("ignoring station data since station uptime (%+v) is "+ "shorter than data settle time (%+v)", data.Uptime, settleTime) continue } @@ -462,6 +467,10 @@ func RunStation(ctx context.Context, station Station, feeders []Feeder, updateIn feeder.Feed(data) } + for _, publisher := range publishers { + publisher.Publish(data) + } + case <-ctx.Done(): return }