Skip to content

Commit

Permalink
Add ping stats (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Dec 11, 2023
1 parent bea3bcd commit 1243f27
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 0 deletions.
42 changes: 42 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"io"
"sync"
"time"
)

// ErrNotConnected is returned if a function is called before Connect.
Expand Down Expand Up @@ -45,6 +46,47 @@ type BaseClient struct {
muConnecting sync.RWMutex
muWrite sync.Mutex
idLast uint32

muStats sync.RWMutex
stats BaseStats
}

// BaseStats stores base client statistics.
type BaseStats struct {
// Recent ping delay.
PingDelayRecent time.Duration
// Maximum ping delay.
PingDelayMax time.Duration
// Minimum ping delay.
PingDelayMin time.Duration
// Count of ping error.
CountPingError int
}

func (c *BaseClient) storePingDelay(d time.Duration) {
c.muStats.Lock()
c.stats.PingDelayRecent = d
if c.stats.PingDelayMax < d {
c.stats.PingDelayMax = d
}
if c.stats.PingDelayMin > d || c.stats.PingDelayMin == 0 {
c.stats.PingDelayMin = d
}
c.muStats.Unlock()
}

func (c *BaseClient) storePingError() {
c.muStats.Lock()
c.stats.PingDelayRecent = 0
c.stats.CountPingError++
c.muStats.Unlock()
}

// Stats returns base client stats.
func (c *BaseClient) Stats() BaseStats {
c.muStats.RLock()
defer c.muStats.RUnlock()
return c.stats
}

// Handle registers the message handler.
Expand Down
26 changes: 26 additions & 0 deletions client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,36 @@ func TestIntegration_Ping(t *testing.T) {
t.Fatalf("Unexpected error: '%v'", err)
}

if d := cli.Stats().PingDelayRecent; d != 0 {
t.Errorf("Initial PingDelayRecent must be 0, got %v", d)
}
if d := cli.Stats().PingDelayMax; d != 0 {
t.Errorf("Initial PingDelayMax must be 0, got %v", d)
}
if d := cli.Stats().PingDelayMin; d != 0 {
t.Errorf("Initial PingDelayMin must be 0, got %v", d)
}
if c := cli.Stats().CountPingError; c != 0 {
t.Errorf("Initial CountPingError must be 0, got %v", c)
}

if err := cli.Ping(ctx); err != nil {
t.Fatalf("Unexpected error: '%v'", err)
}

if d := cli.Stats().PingDelayRecent; d <= 0 {
t.Errorf("Initial PingDelayRecent must be >0, got %v", d)
}
if d := cli.Stats().PingDelayMax; d <= 0 {
t.Errorf("Initial PingDelayMax must be >0, got %v", d)
}
if d := cli.Stats().PingDelayMin; d <= 0 {
t.Errorf("Initial PingDelayMin must be >0, got %v", d)
}
if c := cli.Stats().CountPingError; c != 0 {
t.Errorf("CountPingError must be 0, got %v", c)
}

if err := cli.Disconnect(ctx); err != nil {
t.Fatalf("Unexpected error: '%v'", err)
}
Expand Down
4 changes: 4 additions & 0 deletions keepalive_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func TestIntegration_KeepAlive(t *testing.T) {
t.Fatalf("Unexpected error: '%v'", err)
}
}

if c := cli.Stats().CountPingError; c != 1 {
t.Errorf("CountPingError must be 1, got %v", c)
}
})
}
}
6 changes: 6 additions & 0 deletions pingreq.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mqtt

import (
"context"
"time"
)

// Ping to the broker.
Expand All @@ -33,15 +34,20 @@ func (c *BaseClient) Ping(ctx context.Context) error {
sig.mu.Unlock()

pkt := pack(packetPingReq.b())

tReq := time.Now()
if err := c.write(pkt); err != nil {
return wrapError(err, "sending PINGREQ")
}
select {
case <-c.connClosed:
return wrapError(ErrClosedTransport, "sending PINGREQ")
case <-ctx.Done():
c.storePingError()
return wrapError(ctx.Err(), "waiting PINGRESP")
case <-chPingResp:
tRes := time.Now()
c.storePingDelay(tRes.Sub(tReq))
}
return nil
}

0 comments on commit 1243f27

Please sign in to comment.