diff --git a/client.go b/client.go index ad354ab..0baaff3 100644 --- a/client.go +++ b/client.go @@ -18,6 +18,7 @@ import ( "errors" "io" "sync" + "time" ) // ErrNotConnected is returned if a function is called before Connect. @@ -45,6 +46,47 @@ type BaseClient struct { muConnecting sync.RWMutex muWrite sync.Mutex idLast uint32 + + muStats sync.RWMutex + stats BaseStats +} + +// BaseStats stores base client statistics. +type BaseStats struct { + // Recent ping delay. + PingDelayRecent time.Duration + // Maximum ping delay. + PingDelayMax time.Duration + // Minimum ping delay. + PingDelayMin time.Duration + // Count of ping error. + CountPingError int +} + +func (c *BaseClient) storePingDelay(d time.Duration) { + c.muStats.Lock() + c.stats.PingDelayRecent = d + if c.stats.PingDelayMax < d { + c.stats.PingDelayMax = d + } + if c.stats.PingDelayMin > d || c.stats.PingDelayMin == 0 { + c.stats.PingDelayMin = d + } + c.muStats.Unlock() +} + +func (c *BaseClient) storePingError() { + c.muStats.Lock() + c.stats.PingDelayRecent = 0 + c.stats.CountPingError++ + c.muStats.Unlock() +} + +// Stats returns base client stats. +func (c *BaseClient) Stats() BaseStats { + c.muStats.RLock() + defer c.muStats.RUnlock() + return c.stats } // Handle registers the message handler. diff --git a/client_integration_test.go b/client_integration_test.go index a3fc974..03ff620 100644 --- a/client_integration_test.go +++ b/client_integration_test.go @@ -266,10 +266,36 @@ func TestIntegration_Ping(t *testing.T) { t.Fatalf("Unexpected error: '%v'", err) } + if d := cli.Stats().PingDelayRecent; d != 0 { + t.Errorf("Initial PingDelayRecent must be 0, got %v", d) + } + if d := cli.Stats().PingDelayMax; d != 0 { + t.Errorf("Initial PingDelayMax must be 0, got %v", d) + } + if d := cli.Stats().PingDelayMin; d != 0 { + t.Errorf("Initial PingDelayMin must be 0, got %v", d) + } + if c := cli.Stats().CountPingError; c != 0 { + t.Errorf("Initial CountPingError must be 0, got %v", c) + } + if err := cli.Ping(ctx); err != nil { t.Fatalf("Unexpected error: '%v'", err) } + if d := cli.Stats().PingDelayRecent; d <= 0 { + t.Errorf("Initial PingDelayRecent must be >0, got %v", d) + } + if d := cli.Stats().PingDelayMax; d <= 0 { + t.Errorf("Initial PingDelayMax must be >0, got %v", d) + } + if d := cli.Stats().PingDelayMin; d <= 0 { + t.Errorf("Initial PingDelayMin must be >0, got %v", d) + } + if c := cli.Stats().CountPingError; c != 0 { + t.Errorf("CountPingError must be 0, got %v", c) + } + if err := cli.Disconnect(ctx); err != nil { t.Fatalf("Unexpected error: '%v'", err) } diff --git a/keepalive_integration_test.go b/keepalive_integration_test.go index 8370875..8e0de84 100644 --- a/keepalive_integration_test.go +++ b/keepalive_integration_test.go @@ -54,6 +54,10 @@ func TestIntegration_KeepAlive(t *testing.T) { t.Fatalf("Unexpected error: '%v'", err) } } + + if c := cli.Stats().CountPingError; c != 1 { + t.Errorf("CountPingError must be 1, got %v", c) + } }) } } diff --git a/pingreq.go b/pingreq.go index 9a32851..160156e 100644 --- a/pingreq.go +++ b/pingreq.go @@ -16,6 +16,7 @@ package mqtt import ( "context" + "time" ) // Ping to the broker. @@ -33,6 +34,8 @@ func (c *BaseClient) Ping(ctx context.Context) error { sig.mu.Unlock() pkt := pack(packetPingReq.b()) + + tReq := time.Now() if err := c.write(pkt); err != nil { return wrapError(err, "sending PINGREQ") } @@ -40,8 +43,11 @@ func (c *BaseClient) Ping(ctx context.Context) error { case <-c.connClosed: return wrapError(ErrClosedTransport, "sending PINGREQ") case <-ctx.Done(): + c.storePingError() return wrapError(ctx.Err(), "waiting PINGRESP") case <-chPingResp: + tRes := time.Now() + c.storePingDelay(tRes.Sub(tReq)) } return nil }