Skip to content

Commit

Permalink
Improved WS
Browse files Browse the repository at this point in the history
  • Loading branch information
tskaard committed Jul 6, 2020
1 parent f3aa87b commit e3065a0
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 80 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ require (
github.com/machinebox/graphql v0.2.2
github.com/matryer/is v1.2.0 // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/sirupsen/logrus v1.4.1
golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/objx v0.1.1 // indirect
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
)

go 1.13
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/machinebox/graphql v0.2.2 h1:dWKpJligYKhYKO5A2gvNhkJdQMNZeChZYyBbrZkBZfo=
github.com/machinebox/graphql v0.2.2/go.mod h1:F+kbVMHuwrQ5tYgU9JXlnskM8nOaFxCAEolaQybkjWA=
github.com/matryer/is v1.2.0 h1:92UTHpy8CDwaJ08GqLDzhhuixiBUUD1p3AU6PHddz4A=
Expand All @@ -14,10 +16,16 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
6 changes: 6 additions & 0 deletions homes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type HomeViewer struct {
Home Home `json:"home"`
}

type PreviousMeterData struct {
Power float64 `json:"power"`
PowerProduction float64 `json:"powerProduction"`
}

// Home structure
type Home struct {
ID string `json:"id"`
Expand All @@ -40,6 +45,7 @@ type Home struct {
PrimaryHeatingSource string `json:"primaryHeatingSource"`
HasVentilationSystem bool `json:"hasVentilationSystem"`
CurrentSubscription CurrentSubscription `json:"currentSubscription"`
PreviousMeterData PreviousMeterData `json:"previousMeterData"`
}

type Address struct {
Expand Down
213 changes: 144 additions & 69 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ import (
const subscriptionEndpoint = "v1-beta/gql/subscriptions"
const tibberHost = "api.tibber.com"

const (
StreamStateConnected = "CONNECTED"
StreamStateConnecting = "CONNECTING"
StreamStateDisconnected = "DISCONNECTED"
)

// MsgChan for reciving messages
type MsgChan chan *StreamMsg

Expand All @@ -25,6 +31,11 @@ type StreamMsg struct {
Payload Payload `json:"payload"`
}

type StreamState struct {
State string
Err error
}

// Payload in StreamMsg
type Payload struct {
Data Data `json:"data"`
Expand Down Expand Up @@ -65,6 +76,11 @@ func (m *LiveMeasurement) IsExtended() bool {
return m.CurrentPhase1 > 0 || m.CurrentPhase2 > 0 || m.CurrentPhase3 > 0
}

// HasPower returns true if the report contains power measurement
func (m *LiveMeasurement) HasPower() bool {
return m.Power > 0
}

// AsFloatMap returns the LiveMeasurement struct as a float map
func (m *LiveMeasurement) AsFloatMap() map[string]float64 {
return map[string]float64{
Expand All @@ -90,111 +106,169 @@ func (m *LiveMeasurement) AsFloatMap() map[string]float64 {

// Stream for subscribing to Tibber pulse
type Stream struct {
Token string
ID string
isRunning bool
initialized bool
client *websocket.Conn
Token string
ID string
isRunning bool
initialized bool
client *websocket.Conn
stateReportChan chan StreamState
outputChan MsgChan
}

func (ts *Stream) StateReportChan() chan StreamState {
return ts.stateReportChan
}

// NewStream with id and token
func NewStream(id, token string) *Stream {
ts := Stream{
ID: id,
Token: token,
isRunning: true,
initialized: false,
ID: id,
Token: token,
isRunning: true,
initialized: false,
stateReportChan: make(chan StreamState),
}
return &ts
}

// StartSubscription init connection and subscibes to home id
func (ts *Stream) StartSubscription(outputChan MsgChan) chan error {
// StartSubscription init connection and subscribes to home id
func (ts *Stream) StartSubscription(outputChan MsgChan) error {
// Connect
errChan := make(chan error)
ts.outputChan = outputChan
for {
err := ts.connect()
if err != nil {
log.WithError(err).Error("<TibberStream> Could not connect to websocket")
errChan <- err
time.Sleep(time.Second * 7) // trying to repair the connection
} else {
ts.initialized = false
log.Info("<TibberStream> Connected")
break // connection was made
}
}
ts.startMsgRouter()
return nil
}

func (ts *Stream) reportState(state string, err error) {
st := StreamState{
State: state,
Err: err,
}
select {
case ts.stateReportChan <- st:
default:
log.Debug("<TibberStream> No error liste")
}

}

func (ts *Stream) startMsgRouter() {
go func() {
defer func() {
if r := recover(); r != nil {
log.Error("<TibberStream> Process CRASHED with error: ", r)
ts.StartSubscription(outputChan)
}
}()
defer ts.client.Close()
for {
if !ts.initialized {
ts.sendInitMsg()
}
tm := StreamMsg{}
err := ts.client.ReadJSON(&tm)
if err != nil {
if websocket.IsCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
websocket.CloseNormalClosure) {
log.WithError(err).Error("<TibberStream> CloseError, Reconnecting after 7 seconds")
errChan <- err
time.Sleep(time.Second * 7) // trying to repair the connection
ts.initialized = false
ts.msgLoop()
log.Error("<TibberStream> Restarting msg router")
}
}()
}

func (ts *Stream) msgLoop() {
defer func() {
if r := recover(); r != nil {
log.Error("<TibberStream> Process CRASHED with error: ", r)
time.Sleep(1 * time.Minute)
}
if ts.client != nil {
ts.client.Close()
}
}()
var unknownErrorCounter int
for {
if !ts.initialized {
ts.sendInitMsg()
}
tm := StreamMsg{}
err := ts.client.ReadJSON(&tm)
if err != nil {
if ts.isWsCloseError(err) {
log.WithError(err).Error("<TibberStream> CloseError, Reconnecting after 10 seconds")
ts.reportState(StreamStateDisconnected, err)
time.Sleep(time.Second * 10) // trying to repair the connection
ts.initialized = false
err = ts.connect()
if err != nil {
log.WithError(err).Error("<TibberStream> Could not connect to websocket")
time.Sleep(time.Second * 30)
}
continue
} else {
unknownErrorCounter++
log.WithError(err).Error("<TibberStream> Unknown error while reading data from WS")
ts.reportState(StreamStateDisconnected, err)
time.Sleep(time.Second * 20)
if unknownErrorCounter > 10 {
ts.client.Close()
err = ts.connect()
if err != nil {
log.WithError(err).Error("<TibberStream> Could not connect to websocket")
errChan <- err
time.Sleep(time.Second * 60)
}
continue
} else {
log.WithError(err).Error()
errChan <- err
continue
}
} else {
switch tm.Type {
case "init_success":
log.Info("<TibberStream> Init success")
ts.initialized = true
ts.sendSubMsg()

case "subscription_success":
log.Info("<TibberStream> Subscription success")

case "subscription_data":
tm.HomeID = ts.ID
outputChan <- &tm

case "subscription_fail":
err := fmt.Errorf("Subscription failed")
log.WithError(err).Error()
errChan <- err

default:
err := fmt.Errorf("Unexpected message: %s", tm.Type)
log.WithError(err).Error()
errChan <- err
}
continue
}
if !ts.isRunning {
log.Debug("<TibberStream> Stopping")
break
} else {
unknownErrorCounter = 0
switch tm.Type {
case "init_success":
log.Info("<TibberStream> Init success")
ts.initialized = true
ts.sendSubMsg()

case "subscription_success":
log.Info("<TibberStream> Subscription success")

case "subscription_data":
tm.HomeID = ts.ID
ts.outputChan <- &tm

case "subscription_fail":
err := fmt.Errorf("subscription failed")
log.WithError(err).Error("<TibberStream>")
ts.reportState(StreamStateDisconnected, err)

default:
log.Info("<TibberStream> Unexpected message type :", tm.Type)
}
}
}()
return errChan
if !ts.isRunning {
log.Debug("<TibberStream> Stopping")
break
}
}
}
func (ts *Stream) isWsCloseError(err error) bool {
return websocket.IsCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
websocket.CloseNormalClosure,
websocket.CloseProtocolError,
websocket.CloseUnsupportedData,
websocket.CloseNoStatusReceived,
websocket.CloseInvalidFramePayloadData,
websocket.ClosePolicyViolation,
websocket.CloseMessageTooBig,
websocket.CloseMandatoryExtension,
websocket.CloseInternalServerErr,
websocket.CloseServiceRestart,
websocket.CloseTryAgainLater,
websocket.CloseTLSHandshake)
}

func (ts *Stream) connect() error {
defer func() {
if r := recover(); r != nil {
log.Error("<TibberStream> ID: ", ts.ID, " - Process CRASHED with error : ", r)
time.Sleep(time.Minute * 1)
}
}()
u := url.URL{Scheme: "wss", Host: tibberHost, Path: subscriptionEndpoint}
Expand All @@ -211,6 +285,7 @@ func (ts *Stream) connect() error {
} else {
log.Info("<TibberStream> WS Client is connected - ID: ", ts.ID, " error: ", err)
ts.isRunning = true
ts.reportState(StreamStateConnected, err)
return nil
}
}
Expand Down
20 changes: 11 additions & 9 deletions tibber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func TestGetHomes(t *testing.T) {
func TestGetHomeById(t *testing.T) {
token := string(helperLoadBytes(t, "token.txt"))
tc := NewClient(token)
homeId := string(helperLoadBytes(t, "homeId.txt"))
home, _ := tc.GetHomeById(homeId)
homeID := string(helperLoadBytes(t, "homeId.txt"))
home, _ := tc.GetHomeById(homeID)
if home.ID == "" {
t.Fatalf("GetHomeById: %s %v", homeId, home)
t.Fatalf("GetHomeById: %s %v", homeID, home)
}
}

Expand All @@ -49,11 +49,13 @@ func TestStreams(t *testing.T) {
token := string(helperLoadBytes(t, "token.txt"))
homeID := string(helperLoadBytes(t, "homeId.txt"))
stream := NewStream(homeID, token)
errChan := stream.StartSubscription(msgCh)

err := stream.StartSubscription(msgCh)
if err != nil {
t.Fatalf("Push: %v", err)
}
select {
case err := <-errChan:
t.Fatalf("Stream: %v", err)
case msg := <-msgCh:
t.Log(msg)
case <-time.After(time.Second * 7):
break
}
Expand All @@ -63,8 +65,8 @@ func TestStreams(t *testing.T) {
func TestGetCurrentPrice(t *testing.T) {
token := string(helperLoadBytes(t, "token.txt"))
tc := NewClient(token)
homeId := string(helperLoadBytes(t, "homeId.txt"))
priceInfo, _ := tc.GetCurrentPrice(homeId)
homeID := string(helperLoadBytes(t, "homeId.txt"))
priceInfo, _ := tc.GetCurrentPrice(homeID)
if priceInfo.Level == "" {
t.Fatalf("GetCurrentPrice: %v", priceInfo)
}
Expand Down

0 comments on commit e3065a0

Please sign in to comment.