Skip to content

Commit

Permalink
Add HTTP station data publisher in ESPEasy JSON format
Browse files Browse the repository at this point in the history
  • Loading branch information
3cky committed Jul 15, 2022
1 parent 5817d25 commit 74e48b0
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 52 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Folders
_test/
bin/
.vscode/

# IDE files
*.iml
Expand Down
132 changes: 89 additions & 43 deletions esp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,60 @@
package main

import (
"time"

"github.com/openairtech/api"
)

type EspData struct {
System EspSystem `json:"System"`
WiFi EspWiFi `json:"WiFi"`
Sensors []EspSensors `json:"Sensors"`
TTL int `json:"TTL"`
System *EspSystem `json:"System,omitempty"`
WiFi *EspWiFi `json:"WiFi,omitempty"`
Sensors []EspSensors `json:"Sensors,omitempty"`
TTL int `json:"TTL,omitempty"`
}

type EspSystem struct {
Build int `json:"Build"`
GitBuild string `json:"Git Build"`
SystemLibraries string `json:"System libraries"`
Plugins int `json:"Plugins"`
PluginDescription string `json:"Plugin description"`
LocalTime string `json:"Local time"`
Unit int `json:"Unit"`
Name string `json:"Name"`
Uptime int `json:"Uptime"`
LastBootCause string `json:"Last boot cause"`
ResetReason string `json:"Reset Reason"`
Load float32 `json:"Load"`
LoadLC int `json:"Load LC"`
FreeRAM int `json:"Free RAM"`
Build int `json:"Build,omitempty"`
GitBuild string `json:"Git Build,omitempty"`
SystemLibraries string `json:"System libraries,omitempty"`
Plugins int `json:"Plugins,omitempty"`
PluginDescription string `json:"Plugin description,omitempty"`
LocalTime string `json:"Local time,omitempty"`
Unit int `json:"Unit,omitempty"`
Name string `json:"Name,omitempty"`
UnitName string `json:"Unit Name,omitempty"`
Uptime int `json:"Uptime,omitempty"`
LastBootCause string `json:"Last boot cause,omitempty"`
ResetReason string `json:"Reset Reason,omitempty"`
Load float32 `json:"Load,omitempty"`
LoadLC int `json:"Load LC,omitempty"`
FreeRAM int `json:"Free RAM,omitempty"`
}

type EspWiFi struct {
Hostname string `json:"Hostname"`
IPConfig string `json:"IP config"`
IP string `json:"IP"`
SubnetMask string `json:"Subnet Mask"`
GatewayIP string `json:"Gateway IP"`
MACAddress string `json:"MAC address"` // mega-20190301
StationMAC string `json:"STA MAC"` // mega-20190903
DNS1 string `json:"DNS 1"`
DNS2 string `json:"DNS 2"`
SSID string `json:"SSID"`
BSSID string `json:"BSSID"`
Channel int `json:"Channel"`
ConnectedMsec int `json:"Connected msec"`
LastDisconnectReason int `json:"Last Disconnect Reason"`
LastDisconnectReasonStr string `json:"Last Disconnect Reason str"`
NumberReconnects int `json:"Number reconnects"`
RSSI int `json:"RSSI"`
Hostname string `json:"Hostname,omitempty"`
IPConfig string `json:"IP config,omitempty"`
IP string `json:"IP,omitempty"`
SubnetMask string `json:"Subnet Mask,omitempty"`
GatewayIP string `json:"Gateway IP,omitempty"`
MACAddress string `json:"MAC address"` // mega-20190301
StationMAC string `json:"STA MAC,omitempty"` // mega-20190903
DNS1 string `json:"DNS 1,omitempty"`
DNS2 string `json:"DNS 2,omitempty"`
SSID string `json:"SSID,omitempty"`
BSSID string `json:"BSSID,omitempty"`
Channel int `json:"Channel,omitempty"`
ConnectedMsec int `json:"Connected msec,omitempty"`
LastDisconnectReason int `json:"Last Disconnect Reason,omitempty"`
LastDisconnectReasonStr string `json:"Last Disconnect Reason str,omitempty"`
NumberReconnects int `json:"Number reconnects,omitempty"`
RSSI int `json:"RSSI,omitempty"`
}

type EspTaskValues struct {
ValueNumber int `json:"ValueNumber"`
ValueNumber int `json:"ValueNumber,omitempty"`
Name string `json:"Name"`
NrDecimals int `json:"NrDecimals"`
NrDecimals int `json:"NrDecimals,omitempty"`
Value float32 `json:"Value"`
}

Expand All @@ -76,13 +79,13 @@ type EspDataAcquisition struct {
}

type EspSensors struct {
TaskValues []EspTaskValues `json:"TaskValues"`
DataAcquisition []EspDataAcquisition `json:"DataAcquisition"`
TaskInterval int `json:"TaskInterval"`
Type string `json:"Type"`
TaskValues []EspTaskValues `json:"TaskValues,omitempty"`
DataAcquisition []EspDataAcquisition `json:"DataAcquisition,omitempty"`
TaskInterval int `json:"TaskInterval,omitempty"`
Type string `json:"Type,omitempty"`
TaskName string `json:"TaskName"`
TaskEnabled bool `json:"TaskEnabled,string"`
TaskNumber int `json:"TaskNumber"`
TaskEnabled bool `json:"TaskEnabled,string,omitempty"`
TaskNumber int `json:"TaskNumber,omitempty"`
}

type EspGpioControlResponse struct {
Expand All @@ -93,6 +96,49 @@ type EspGpioControlResponse struct {
State int `json:"state"`
}

func NewEspData(m *api.Measurement, uptime time.Duration, name string) *EspData {
bmeSensor := EspSensors{
TaskName: "BME280",
TaskValues: []EspTaskValues{
{
Name: "Temperature",
Value: *m.Temperature,
},
{
Name: "Humidity",
Value: *m.Humidity,
},
{
Name: "Pressure",
Value: *m.Pressure,
},
},
}
sdsSensor := EspSensors{
TaskName: "SDS011",
TaskValues: []EspTaskValues{
{
Name: "PM2.5",
Value: *m.Pm25,
},
{
Name: "PM10",
Value: *m.Pm10,
},
},
}
return &EspData{
System: &EspSystem{
UnitName: name,
Uptime: int(uptime.Minutes()),
},
Sensors: []EspSensors{
bmeSensor,
sdsSensor,
},
}
}

func (ed *EspData) Measurement(t api.UnixTime) *api.Measurement {
m := api.Measurement{
Timestamp: &t,
Expand Down
10 changes: 9 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func main() {
disabledFeeders := stringArray{}
flag.Var(&disabledFeeders, "D", fmt.Sprintf("disable feeder (%s)", fnl))

httpPublisherPort := flag.Int("J", 0, "sensor data HTTP publisher port (0 to disable HTTP publisher)")

flag.Parse()

if *versionFlag {
Expand Down Expand Up @@ -172,6 +174,12 @@ func main() {

log.Debugf("enabled feeders: [%s]", SliceToString(efn))

var ps []Publisher

if *httpPublisherPort > 0 {
ps = append(ps, NewHttpPublisher(*httpPublisherPort))
}

var station Station
if *mode == StationModeEsp {
station = NewEspStation(version, *espHost, *espPort, *espHeaterGpioPin, *stationTokenId)
Expand All @@ -183,7 +191,7 @@ func main() {
}
}

RunStation(ctx, station, ef, *updateInterval, *settleTime, *disablePmCorrectionFlag,
RunStation(ctx, station, ef, ps, *updateInterval, *settleTime, *disablePmCorrectionFlag,
*enableHeaterFlag, *heaterTurnOnHumidity)

log.Printf("exiting...")
Expand Down
99 changes: 99 additions & 0 deletions publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright © 2022 Victor Antonovich <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"

log "github.com/sirupsen/logrus"
)

type Publisher interface {
Start() error
Stop()
Publish(data *StationData)
}

type HttpPublisher struct {
sync.Mutex

port int

server *http.Server
serverStopWg *sync.WaitGroup

lastData *StationData
}

func NewHttpPublisher(port int) *HttpPublisher {
return &HttpPublisher{
port: port,
}
}

func (hp *HttpPublisher) Start() error {
log.Printf("starting sensor data HTTP publisher at http://0.0.0.0:%d/json", hp.port)
mux := http.NewServeMux()
mux.HandleFunc("/json", func(w http.ResponseWriter, r *http.Request) {
if hp.lastData == nil {
w.WriteHeader(503)
return
}
ld := hp.lastData
ep := NewEspData(ld.LastMeasurement, ld.Uptime, ld.Version)
jd, err := json.Marshal(ep)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
w.Write(jd)
})
hp.server = &http.Server{Addr: fmt.Sprintf(":%d", hp.port), Handler: mux}
hp.serverStopWg = &sync.WaitGroup{}
hp.serverStopWg.Add(1)
go func() {
defer hp.serverStopWg.Done()
if err := hp.server.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("can't start sensor data HTTP publisher: %v", err)
}
}()
return nil
}

func (hp *HttpPublisher) Stop() {
log.Print("stopping sensor data HTTP publisher...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := hp.server.Shutdown(ctx); err != nil {
log.Errorf("error while stopping sensor data HTTP server: %v", err)
}
hp.serverStopWg.Wait()
log.Print("sensor data HTTP publisher stopped")
}

func (hp *HttpPublisher) Publish(data *StationData) {
lastData := *data
hp.Lock()
defer hp.Unlock()
hp.lastData = &lastData
}
25 changes: 17 additions & 8 deletions station.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,15 @@ func (rs *RpiStation) GetData() (*StationData, error) {
}, nil
}

func RunStation(ctx context.Context, station Station, feeders []Feeder, updateInterval time.Duration,
settleTime time.Duration, disablePmCorrection, enableHeater bool, heaterTurnOnHumidity int) {
func RunStation(ctx context.Context, station Station, feeders []Feeder, publishers []Publisher,
updateInterval time.Duration, settleTime time.Duration, disablePmCorrection, enableHeater bool,
heaterTurnOnHumidity int) {
p := time.Duration(0)

for _, publisher := range publishers {
publisher.Start()
}

if err := station.Start(); err != nil {
log.Errorf("can't start station: %v", err)
return
Expand All @@ -411,6 +416,10 @@ func RunStation(ctx context.Context, station Station, feeders []Feeder, updateIn

defer station.Stop()

for _, publisher := range publishers {
defer publisher.Stop()
}

for {
select {
case <-time.After(p):
Expand Down Expand Up @@ -443,17 +452,13 @@ func RunStation(ctx context.Context, station Station, feeders []Feeder, updateIn
Float32RefToString(m.Temperature), Float32RefToString(m.Humidity), Float32RefToString(m.Pressure),
Float32RefToString(m.Pm25), Float32RefToString(m.Pm10))

if len(feeders) == 0 {
continue
}

if time.Now().Before(time.Unix(systemEpoch, 0)) {
log.Info("skipping station data feeding since station system time probably is not in sync")
log.Info("ignoring station data since station system time probably is not in sync")
continue
}

if data.Uptime < settleTime {
log.Infof("skipping station data feeding since station uptime (%+v) is "+
log.Infof("ignoring station data since station uptime (%+v) is "+
"shorter than data settle time (%+v)", data.Uptime, settleTime)
continue
}
Expand All @@ -462,6 +467,10 @@ func RunStation(ctx context.Context, station Station, feeders []Feeder, updateIn
feeder.Feed(data)
}

for _, publisher := range publishers {
publisher.Publish(data)
}

case <-ctx.Done():
return
}
Expand Down

0 comments on commit 74e48b0

Please sign in to comment.