From 840f7ec87f571ef7c044dc4f41a947f534466afa Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Tue, 31 Dec 2024 20:42:00 -0800 Subject: [PATCH] move all telemetry to prometheus --- protocol/messenger.go | 6 +- telemetry/client.go | 695 ++++++-------------------------------- telemetry/client_test.go | 710 ++++++++++----------------------------- telemetry/metrics.go | 193 +++++++++++ telemetry/prometheus.go | 139 -------- 5 files changed, 481 insertions(+), 1262 deletions(-) create mode 100644 telemetry/metrics.go delete mode 100644 telemetry/prometheus.go diff --git a/protocol/messenger.go b/protocol/messenger.go index 77f5f2c025d..e797a717493 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -528,11 +528,13 @@ func NewMessenger( options := []telemetry.TelemetryClientOption{ telemetry.WithPeerID(peerId.String()), } - telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, options...) + telemetryClient, err = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, options...) + if err != nil { + return nil, err + } if c.wakuService != nil { c.wakuService.SetStatusTelemetryClient(telemetryClient) } - telemetryClient.Start(ctx) } messenger = &Messenger{ diff --git a/telemetry/client.go b/telemetry/client.go index 53acf95ddf8..22653094131 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -1,292 +1,56 @@ package telemetry import ( - "bytes" "context" - "encoding/json" "fmt" - "net/http" - "strings" - "sync" + "strconv" "time" "go.uber.org/zap" "github.com/prometheus/client_golang/prometheus" - "github.com/status-im/status-go/common" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/wakuv2" - wps "github.com/waku-org/go-waku/waku/v2/peerstore" - - v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" - v1protocol "github.com/status-im/status-go/protocol/v1" v2common "github.com/status-im/status-go/wakuv2/common" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" ) -type TelemetryType string - -const ( - // Bandwidth as reported by libp2p - ProtocolStatsMetric TelemetryType = "ProtocolStats" - // Envelopes sent by this node - SentEnvelopeMetric TelemetryType = "SentEnvelope" - // Change in status of a sent envelope (usually processing errors) - UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" - // Messages received by this node - ReceivedMessagesMetric TelemetryType = "ReceivedMessages" - // Errors encountered when sending envelopes - ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" - // Total connections for this node at a given time - PeerCountMetric TelemetryType = "PeerCount" - // Number of failed peer connections for this node at a given time - PeerConnFailuresMetric TelemetryType = "PeerConnFailure" - // Store confirmation for a sent message successful - MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" - // Store confirmation for a sent message failed - MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" - // Total connections for this node per shard at a given time - PeerCountByShardMetric TelemetryType = "PeerCountByShard" - // Total connections for this node per discovery origin at a given time - PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" - // Error encountered when attempting to dial a peer - DialFailureMetric TelemetryType = "DialFailure" - // Missed message as detected by periodic store query - MissedMessageMetric TelemetryType = "MissedMessages" - // Missed message with a relevant filter - MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessages" - // MVDS ack received for a sent message - MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed" - // Total number and size of Waku messages sent by this node - SentMessageTotalMetric TelemetryType = "SentMessageTotal" -) - -const MaxRetryCache = 5000 - -type TelemetryRequest struct { - Id int `json:"id"` - TelemetryType TelemetryType `json:"telemetry_type"` - TelemetryData *json.RawMessage `json:"telemetry_data"` -} - -func (c *Client) PushReceivedMessages(ctx context.Context, receivedMessages ReceivedMessages) { - c.processAndPushTelemetry(ctx, receivedMessages) -} - -func (c *Client) PushSentEnvelope(ctx context.Context, sentEnvelope wakuv2.SentEnvelope) { - c.processAndPushTelemetry(ctx, sentEnvelope) -} - -func (c *Client) PushReceivedEnvelope(ctx context.Context, receivedEnvelope *v2protocol.Envelope) { - c.processAndPushTelemetry(ctx, receivedEnvelope) -} - -func (c *Client) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope wakuv2.ErrorSendingEnvelope) { - c.processAndPushTelemetry(ctx, errorSendingEnvelope) -} - -func (c *Client) PushPeerCount(ctx context.Context, peerCount int) { - now := time.Now() - if peerCount != c.lastPeerCount && now.Sub(c.lastPeerCountTime) > 1*time.Second { - c.lastPeerCount = peerCount - c.lastPeerCountTime = now - c.processAndPushTelemetry(ctx, PeerCount{PeerCount: peerCount}) - } -} - -func (c *Client) PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) { - for peerID, failures := range peerConnFailures { - if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists { - if failures == lastFailures { - continue - } - } - c.lastPeerConnFailures[peerID] = failures - c.processAndPushTelemetry(ctx, PeerConnFailure{FailedPeerId: peerID, FailureCount: failures}) - } -} - -func (c *Client) PushMessageCheckSuccess(ctx context.Context, messageHash string) { - c.processAndPushTelemetry(ctx, MessageCheckSuccess{MessageHash: messageHash}) -} - -func (c *Client) PushMessageCheckFailure(ctx context.Context, messageHash string) { - c.processAndPushTelemetry(ctx, MessageCheckFailure{MessageHash: messageHash}) -} - -func (c *Client) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) { - for shard, count := range peerCountByShard { - c.processAndPushTelemetry(ctx, PeerCountByShard{Shard: shard, Count: count}) - } -} - -func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) { - for origin, count := range peerCountByOrigin { - originStr := getOriginString(origin) - v2common.PeerCountByOrigin.WithLabelValues(originStr).Set(float64(count)) - } -} - -func getOriginString(origin wps.Origin) string { - switch origin { - case wps.Unknown: - return "unknown" - case wps.Discv5: - return "discv5" - case wps.Static: - return "static" - case wps.PeerExchange: - return "peer_exchange" - case wps.DNSDiscovery: - return "dns_discovery" - case wps.Rendezvous: - return "rendezvous" - case wps.PeerManager: - return "peer_manager" - default: - return "unknown" - } -} - -func (c *Client) PushDialFailure(ctx context.Context, dialFailure v2common.DialError) { - var errorMessage string = "" - if dialFailure.ErrType == v2common.ErrorUnknown { - errorMessage = dialFailure.ErrMsg - } - c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols}) -} - -func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) { - c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope}) -} - -func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage *v2common.ReceivedMessage) { - c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage}) -} - -func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, messageHash string) { - c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{MessageHash: messageHash}) -} - -func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) { - c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize}) -} - type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message Messages []*v1protocol.StatusMessage } -type PeerCount struct { - PeerCount int -} - -type PeerConnFailure struct { - FailedPeerId string - FailureCount int -} - -type MessageCheckSuccess struct { - MessageHash string -} - -type MessageCheckFailure struct { - MessageHash string -} - -type PeerCountByShard struct { - Shard uint16 - Count uint -} - -type PeerCountByOrigin struct { - Origin wps.Origin - Count uint -} - -type DialFailure struct { - ErrorType v2common.DialErrorType - ErrorMsg string - Protocols string -} - -type MissedMessage struct { - Envelope *v2protocol.Envelope -} - -type MissedRelevantMessage struct { - ReceivedMessage *v2common.ReceivedMessage -} - -type MessageDeliveryConfirmed struct { - MessageHash string -} - -type PrometheusMetricWrapper struct { - Typ TelemetryType - Data *json.RawMessage -} - -type SentMessageTotal struct { - Size uint32 -} - type Client struct { - serverURL string - httpClient *http.Client logger *zap.Logger keyUID string nodeName string peerId string version string - telemetryCh chan TelemetryRequest - telemetryCacheLock sync.Mutex - telemetryCache []TelemetryRequest - telemetryRetryCache []TelemetryRequest - nextIdLock sync.Mutex - nextId int - sendPeriod time.Duration + deviceType string lastPeerCount int lastPeerCountTime time.Time lastPeerConnFailures map[string]int - deviceType string - - promMetrics *PrometheusMetrics } type TelemetryClientOption func(*Client) -func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption { - return func(c *Client) { - c.sendPeriod = sendPeriod - } -} - func WithPeerID(peerId string) TelemetryClientOption { return func(c *Client) { c.peerId = peerId } } -func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { - serverURL = strings.TrimRight(serverURL, "/") +func NewClient(logger *zap.Logger, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) (*Client, error) { client := &Client{ - serverURL: serverURL, - httpClient: &http.Client{Timeout: time.Minute}, logger: logger, keyUID: keyUID, nodeName: nodeName, version: version, - telemetryCh: make(chan TelemetryRequest), - telemetryCacheLock: sync.Mutex{}, - telemetryCache: make([]TelemetryRequest, 0), - telemetryRetryCache: make([]TelemetryRequest, 0), - nextId: 0, - nextIdLock: sync.Mutex{}, - sendPeriod: 10 * time.Second, // default value lastPeerCount: 0, lastPeerCountTime: time.Time{}, lastPeerConnFailures: make(map[string]int), @@ -296,393 +60,152 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str opt(client) } - promMetrics := NewPrometheusMetrics(client.processAndPushTelemetry, TelemetryRecord{NodeName: nodeName, PeerID: client.peerId, StatusVersion: version, DeviceType: client.deviceType}) - client.promMetrics = promMetrics + return client, nil +} - client.promMetrics.Register("waku_connected_peers", GaugeType, nil) - client.promMetrics.Register("waku2_envelopes_validated_total", CounterType, prometheus.Labels{}) - client.promMetrics.Register("waku_lightpush_messages", CounterType, prometheus.Labels{}) - client.promMetrics.Register("waku_lightpush_errors", CounterType, prometheus.Labels{}) - client.promMetrics.Register("waku_peer_count_by_origin", CounterType, prometheus.Labels{}) - return client +// RegisterWithRegistry registers all metrics with the provided registry +func (c *Client) RegisterWithRegistry(reg prometheus.Registerer) error { + if err := RegisterMetrics(reg); err != nil { + return fmt.Errorf("failed to register metrics: %v", err) + } + return nil } func (c *Client) SetDeviceType(deviceType string) { c.deviceType = deviceType } -func (c *Client) Start(ctx context.Context) { - go func() { - defer common.LogOnPanic() - for { - select { - case telemetryRequest := <-c.telemetryCh: - c.telemetryCacheLock.Lock() - c.telemetryCache = append(c.telemetryCache, telemetryRequest) - c.telemetryCacheLock.Unlock() - case <-ctx.Done(): - return - } - } - }() - go func() { - defer common.LogOnPanic() - sendPeriod := c.sendPeriod - timer := time.NewTimer(sendPeriod) - defer timer.Stop() - - for { - select { - case <-timer.C: - c.telemetryCacheLock.Lock() - telemetryRequests := make([]TelemetryRequest, len(c.telemetryCache)) - copy(telemetryRequests, c.telemetryCache) - c.telemetryCache = nil - c.telemetryCacheLock.Unlock() - - if len(telemetryRequests) > 0 { - err := c.pushTelemetryRequest(telemetryRequests) - if err != nil { - if sendPeriod < 60*time.Second { //Stop the growing if the timer is > 60s to at least retry every minute - sendPeriod = sendPeriod * 2 - } - } else { - sendPeriod = c.sendPeriod - } - } - timer.Reset(sendPeriod) - case <-ctx.Done(): - return - } - } - - }() - - go func() { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - fmt.Println("exit") - return - case <-ticker.C: - c.promMetrics.Snapshot() +func (c *Client) PushReceivedMessages(ctx context.Context, receivedMessages ReceivedMessages) { + MessagesReceivedTotal.WithLabelValues( + receivedMessages.Filter.PubsubTopic, + receivedMessages.Filter.ContentTopic.String(), + receivedMessages.Filter.ChatID, + ).Inc() - } - } - }() + MessagesSizeBytes.WithLabelValues( + receivedMessages.Filter.PubsubTopic, + receivedMessages.Filter.ContentTopic.String(), + ).Observe(float64(len(receivedMessages.SSHMessage.Payload))) } -func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) { - var telemetryRequest TelemetryRequest - switch v := data.(type) { - case ReceivedMessages: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: ReceivedMessagesMetric, - TelemetryData: c.ProcessReceivedMessages(v), - } - case wakuv2.SentEnvelope: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: SentEnvelopeMetric, - TelemetryData: c.ProcessSentEnvelope(v), - } - case wakuv2.ErrorSendingEnvelope: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: ErrorSendingEnvelopeMetric, - TelemetryData: c.ProcessErrorSendingEnvelope(v), - } - case PeerCount: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: PeerCountMetric, - TelemetryData: c.ProcessPeerCount(v), - } - case PeerConnFailure: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: PeerConnFailuresMetric, - TelemetryData: c.ProcessPeerConnFailure(v), - } - case MessageCheckSuccess: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MessageCheckSuccessMetric, - TelemetryData: c.ProcessMessageCheckSuccess(v), - } - case MessageCheckFailure: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MessageCheckFailureMetric, - TelemetryData: c.ProcessMessageCheckFailure(v), - } - case PeerCountByShard: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: PeerCountByShardMetric, - TelemetryData: c.ProcessPeerCountByShard(v), - } - case PeerCountByOrigin: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: PeerCountByOriginMetric, - TelemetryData: c.ProcessPeerCountByOrigin(v), - } - case DialFailure: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: DialFailureMetric, - TelemetryData: c.ProcessDialFailure(v), - } - case MissedMessage: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MissedMessageMetric, - TelemetryData: c.ProcessMissedMessage(v), - } - case MissedRelevantMessage: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MissedRelevantMessageMetric, - TelemetryData: c.ProcessMissedRelevantMessage(v), - } - case MessageDeliveryConfirmed: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MessageDeliveryConfirmedMetric, - TelemetryData: c.ProcessMessageDeliveryConfirmed(v), - } - case PrometheusMetricWrapper: - pmd := data.(PrometheusMetricWrapper) - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: pmd.Typ, - TelemetryData: pmd.Data, - } - case SentMessageTotal: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: SentMessageTotalMetric, - TelemetryData: c.ProcessSentMessageTotal(v), - } - default: - c.logger.Error("Unknown telemetry data type") - return - } - - select { - case <-ctx.Done(): - return - case c.telemetryCh <- telemetryRequest: - } +func (c *Client) PushSentEnvelope(ctx context.Context, sentEnvelope wakuv2.SentEnvelope) { + MessagesSentTotal.WithLabelValues( + sentEnvelope.Envelope.PubsubTopic(), + sentEnvelope.Envelope.Message().ContentTopic, + sentEnvelope.PublishMethod.String(), + ).Inc() - c.nextIdLock.Lock() - c.nextId++ - c.nextIdLock.Unlock() + MessagesSizeBytes.WithLabelValues( + sentEnvelope.Envelope.PubsubTopic(), + sentEnvelope.Envelope.Message().ContentTopic, + ).Observe(float64(len(sentEnvelope.Envelope.Message().Payload))) } -// This is assuming to not run concurrently as we are not locking the `telemetryRetryCache` -func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error { - if len(c.telemetryRetryCache) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time - removeNum := len(c.telemetryRetryCache) - MaxRetryCache - c.telemetryRetryCache = c.telemetryRetryCache[removeNum:] - } - c.telemetryRetryCache = append(c.telemetryRetryCache, request...) - - url := fmt.Sprintf("%s/record-metrics", c.serverURL) - body, err := json.Marshal(c.telemetryRetryCache) - if err != nil { - c.logger.Error("Error marshaling telemetry data", zap.Error(err)) - return err - } - res, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending telemetry data", zap.Error(err)) - return err - } - defer res.Body.Close() - var responseBody []map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&responseBody); err != nil { - c.logger.Error("Error decoding response body", zap.Error(err)) - return err - } - if res.StatusCode != http.StatusCreated { - c.logger.Error("Error sending telemetry data", zap.Int("statusCode", res.StatusCode), zap.Any("responseBody", responseBody)) - return fmt.Errorf("status code %d, response body: %v", res.StatusCode, responseBody) - } - - c.telemetryRetryCache = nil - return nil +func (c *Client) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope wakuv2.ErrorSendingEnvelope) { + MessageErrors.WithLabelValues( + "send_error", + errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(), + errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic, + ).Inc() } -func (c *Client) commonPostBody() map[string]interface{} { - return map[string]interface{}{ - "nodeName": c.nodeName, - "peerId": c.peerId, - "statusVersion": c.version, - "deviceType": c.deviceType, - "timestamp": time.Now().Unix(), +func (c *Client) PushPeerCount(ctx context.Context, peerCount int) { + now := time.Now() + if peerCount != c.lastPeerCount && now.Sub(c.lastPeerCountTime) > 1*time.Second { + c.lastPeerCount = peerCount + c.lastPeerCountTime = now + ConnectedPeers.Set(float64(peerCount)) } } -func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage { - var postBody []map[string]interface{} - for _, message := range receivedMessages.Messages { - messageBody := c.commonPostBody() - messageBody["chatId"] = receivedMessages.Filter.ChatID - messageBody["messageHash"] = types.EncodeHex(receivedMessages.SSHMessage.Hash) - messageBody["messageId"] = message.ApplicationLayer.ID - messageBody["sentAt"] = receivedMessages.SSHMessage.Timestamp - messageBody["pubsubTopic"] = receivedMessages.Filter.PubsubTopic - messageBody["topic"] = receivedMessages.Filter.ContentTopic.String() - messageBody["messageType"] = message.ApplicationLayer.Type.String() - messageBody["receiverKeyUID"] = c.keyUID - messageBody["messageSize"] = len(receivedMessages.SSHMessage.Payload) - postBody = append(postBody, messageBody) +func (c *Client) PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) { + for peerID, failures := range peerConnFailures { + if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists { + if failures == lastFailures { + continue + } + } + c.lastPeerConnFailures[peerID] = failures + PeerConnectionFailures.WithLabelValues(peerID).Add(float64(failures)) } - body, _ := json.Marshal(postBody) - jsonRawMessage := json.RawMessage(body) - return &jsonRawMessage -} - -func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = sentEnvelope.Envelope.Hash().String() - postBody["sentAt"] = uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)) - postBody["pubsubTopic"] = sentEnvelope.Envelope.PubsubTopic() - postBody["topic"] = sentEnvelope.Envelope.Message().ContentTopic - postBody["senderKeyUID"] = c.keyUID - postBody["publishMethod"] = sentEnvelope.PublishMethod.String() - return c.marshalPostBody(postBody) } -func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = errorSendingEnvelope.SentEnvelope.Envelope.Hash().String() - postBody["sentAt"] = uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)) - postBody["pubsubTopic"] = errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic() - postBody["topic"] = errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic - postBody["senderKeyUID"] = c.keyUID - postBody["publishMethod"] = errorSendingEnvelope.SentEnvelope.PublishMethod.String() - postBody["error"] = errorSendingEnvelope.Error.Error() - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { - postBody := c.commonPostBody() - postBody["peerCount"] = peerCount.PeerCount - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage { - postBody := c.commonPostBody() - postBody["failedPeerId"] = peerConnFailure.FailedPeerId - postBody["failureCount"] = peerConnFailure.FailureCount - postBody["nodeKeyUID"] = c.keyUID - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessMessageCheckSuccess(messageCheckSuccess MessageCheckSuccess) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = messageCheckSuccess.MessageHash - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessPeerCountByShard(peerCountByShard PeerCountByShard) *json.RawMessage { - postBody := c.commonPostBody() - postBody["shard"] = peerCountByShard.Shard - postBody["count"] = peerCountByShard.Count - return c.marshalPostBody(postBody) +func (c *Client) PushMessageCheckSuccess(ctx context.Context, messageHash string) { + StoreQuerySuccesses.WithLabelValues(messageHash).Inc() } -func (c *Client) ProcessMessageCheckFailure(messageCheckFailure MessageCheckFailure) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = messageCheckFailure.MessageHash - return c.marshalPostBody(postBody) +func (c *Client) PushMessageCheckFailure(ctx context.Context, messageHash string) { + StoreQueryFailures.WithLabelValues(messageHash).Inc() } -func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *json.RawMessage { - postBody := c.commonPostBody() - postBody["origin"] = peerCountByOrigin.Origin - postBody["count"] = peerCountByOrigin.Count - return c.marshalPostBody(postBody) +func (c *Client) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) { + for shard, count := range peerCountByShard { + PeersByShard.WithLabelValues(strconv.FormatUint(uint64(shard), 10)).Set(float64(count)) + } } -func (c *Client) ProcessDialFailure(dialFailure DialFailure) *json.RawMessage { - postBody := c.commonPostBody() - postBody["errorType"] = dialFailure.ErrorType - postBody["errorMsg"] = dialFailure.ErrorMsg - postBody["protocols"] = dialFailure.Protocols - return c.marshalPostBody(postBody) +func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) { + for origin, count := range peerCountByOrigin { + PeersByOrigin.WithLabelValues(getOriginString(origin)).Set(float64(count)) + } } -func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = missedMessage.Envelope.Hash().String() - postBody["sentAt"] = uint32(missedMessage.Envelope.Message().GetTimestamp() / int64(time.Second)) - postBody["pubsubTopic"] = missedMessage.Envelope.PubsubTopic() - postBody["contentTopic"] = missedMessage.Envelope.Message().ContentTopic - return c.marshalPostBody(postBody) +func (c *Client) PushDialFailure(ctx context.Context, dialFailure v2common.DialError) { + PeerDialFailures.WithLabelValues( + dialFailure.ErrType.String(), + dialFailure.Protocols, + ).Inc() } -func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessage) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = missedMessage.ReceivedMessage.Envelope.Hash().String() - postBody["sentAt"] = missedMessage.ReceivedMessage.Sent - postBody["pubsubTopic"] = missedMessage.ReceivedMessage.PubsubTopic - postBody["contentTopic"] = missedMessage.ReceivedMessage.ContentTopic - return c.marshalPostBody(postBody) +func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) { + MissedMessages.WithLabelValues( + envelope.PubsubTopic(), + envelope.Message().ContentTopic, + ).Inc() } -func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed MessageDeliveryConfirmed) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = messageDeliveryConfirmed.MessageHash - return c.marshalPostBody(postBody) +func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage *v2common.ReceivedMessage) { + MissedMessages.WithLabelValues( + receivedMessage.PubsubTopic, + receivedMessage.ContentTopic.String(), + ).Inc() } -func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *json.RawMessage { - postBody := c.commonPostBody() - postBody["size"] = sentMessageTotal.Size - return c.marshalPostBody(postBody) +func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, messageHash string) { + MessageDeliveryConfirmations.WithLabelValues("", "").Inc() // TODO: Add proper labels if available } -// Helper function to marshal post body and handle errors -func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage { - body, err := json.Marshal(postBody) - if err != nil { - c.logger.Error("Error marshaling post body", zap.Error(err)) - return nil - } - jsonRawMessage := json.RawMessage(body) - return &jsonRawMessage +func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) { + MessagesSizeBytes.WithLabelValues("", "").Observe(float64(messageSize)) } func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { - defer common.LogOnPanic() - c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) - url := fmt.Sprintf("%s/update-envelope", c.serverURL) - var errorString = "" if processingError != nil { - errorString = processingError.Error() + MessageErrors.WithLabelValues( + "processing_error", + shhMessage.PubsubTopic, + shhMessage.Topic.String(), + ).Inc() } +} - postBody := map[string]interface{}{ - "messageHash": types.EncodeHex(shhMessage.Hash), - "sentAt": shhMessage.Timestamp, - "pubsubTopic": shhMessage.PubsubTopic, - "topic": shhMessage.Topic, - "receiverKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "processingError": errorString, - "deviceType": c.deviceType, - } - body, _ := json.Marshal(postBody) - _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending envelope update to telemetry server", zap.Error(err)) +func getOriginString(origin wps.Origin) string { + switch origin { + case wps.Unknown: + return "unknown" + case wps.Discv5: + return "discv5" + case wps.Static: + return "static" + case wps.PeerExchange: + return "peer_exchange" + case wps.DNSDiscovery: + return "dns_discovery" + case wps.Rendezvous: + return "rendezvous" + case wps.PeerManager: + return "peer_manager" + default: + return "unknown" } } diff --git a/telemetry/client_test.go b/telemetry/client_test.go index 7c1e6e7848d..72df121fe94 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -2,612 +2,252 @@ package telemetry import ( "context" - "encoding/json" "errors" - "net/http" - "net/http/httptest" - "os" - "slices" - "sync" + "strconv" "testing" "time" - "go.uber.org/zap" - "google.golang.org/protobuf/proto" - - "github.com/waku-org/go-waku/waku/v2/api/publish" - v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" - "github.com/status-im/status-go/protocol/tt" - v1protocol "github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/wakuv2" - "github.com/status-im/status-go/wakuv2/common" + "github.com/waku-org/go-waku/waku/v2/api/publish" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) var ( - testContentTopic = "/waku/1/0x12345679/rfc26" + testRegistry = prometheus.NewRegistry() + errTest = errors.New("test error") ) -func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType, expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - t.Errorf("Expected 'POST' request, got '%s'", r.Method) - } - if r.URL.EscapedPath() != "/record-metrics" { - t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath()) - } - - // Check the request body is as expected - var received []TelemetryRequest - err := json.NewDecoder(r.Body).Decode(&received) - if err != nil { - t.Fatal(err) - } - - if expectedCondition != nil { - shouldSucceed, shouldFail := expectedCondition(received) - if shouldFail { - w.WriteHeader(http.StatusInternalServerError) - t.Fail() - return - } - if !shouldSucceed { - w.WriteHeader(http.StatusOK) - return - } - } else { - if len(received) != 1 { - t.Errorf("Unexpected data received: %+v", received) - } else { - if received[0].TelemetryType != expectedType { - t.Errorf("Unexpected telemetry type: got %v, want %v", received[0].TelemetryType, expectedType) - } - } - } - // If the data is as expected, respond with success - t.Log("Responding with success") - responseBody := []map[string]interface{}{ - {"status": "created"}, - } - body, err := json.Marshal(responseBody) - if err != nil { - t.Fatalf("Failed to marshal response body: %v", err) - } - w.WriteHeader(http.StatusCreated) - _, err = w.Write(body) - if err != nil { - t.Fatalf("Failed to write response body: %v", err) - } - wg.Done() - })) -} - -func createClient(t *testing.T, mockServerURL string) *Client { - config := zap.NewDevelopmentConfig() - config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - logger, err := config.Build() +func init() { + // Register metrics with test registry + err := RegisterMetrics(testRegistry) if err != nil { - t.Fatalf("Failed to create logger: %v", err) + panic("Failed to register metrics for testing: " + err.Error()) } - return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond), WithPeerID("16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm")) } -type expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) - -func withMockServer(t *testing.T, expectedType TelemetryType, expectedCondition expectedCondition, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) { - var wg sync.WaitGroup - wg.Add(1) // Expecting one request - - mockServer := createMockServer(t, &wg, expectedType, expectedCondition) - defer mockServer.Close() - - client := createClient(t, mockServer.URL) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - testFunc(ctx, t, client, &wg) - - // Wait for the request to be received - wg.Wait() +func createTestClient(t *testing.T) *Client { + logger := zap.NewNop() + client, err := NewClient(logger, "test-key", "test-node", "test-version") + require.NoError(t, err) + return client } -func sendEnvelope(ctx context.Context, client *Client) { - client.PushSentEnvelope(ctx, wakuv2.SentEnvelope{ - Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: testContentTopic, - Version: proto.Uint32(0), - Timestamp: proto.Int64(time.Now().Unix()), - }, 0, ""), - PublishMethod: publish.LightPush, - }) +func createTestMessage(pubsubTopic string, contentTopic types.TopicType, payload []byte) *types.Message { + return &types.Message{ + PubsubTopic: pubsubTopic, + Topic: contentTopic, + Payload: payload, + } } -func TestClient_ProcessReceivedMessages(t *testing.T) { - withMockServer(t, ReceivedMessagesMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - // Create a telemetry request to send - data := ReceivedMessages{ - Filter: transport.Filter{ - ChatID: "testChat", - PubsubTopic: "testTopic", - ContentTopic: types.StringToTopic(testContentTopic), - }, - SSHMessage: &types.Message{ - Hash: []byte("hash"), - Timestamp: uint32(time.Now().Unix()), - }, - Messages: []*v1protocol.StatusMessage{ - { - ApplicationLayer: v1protocol.ApplicationLayer{ - ID: types.HexBytes("123"), - Type: 1, - }, - }, - }, - } - - // Send the telemetry request - client.Start(ctx) - client.PushReceivedMessages(ctx, data) - }) +func getCounterValue(metric *prometheus.CounterVec, labels ...string) float64 { + m := metric.WithLabelValues(labels...) + pb := &dto.Metric{} + err := m.(prometheus.Metric).Write(pb) + if err != nil { + return 0 + } + return pb.Counter.GetValue() } -func TestClient_ProcessSentEnvelope(t *testing.T) { - withMockServer(t, SentEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - // Send the telemetry request - client.Start(ctx) - sendEnvelope(ctx, client) - }) +func getGaugeValue(metric prometheus.Gauge) float64 { + pb := &dto.Metric{} + err := metric.(prometheus.Metric).Write(pb) + if err != nil { + return 0 + } + return pb.Gauge.GetValue() } -var ( - testENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.status.nodes.status.im" -) - -func TestTelemetryUponPublishError(t *testing.T) { - withMockServer(t, ErrorSendingEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - enrTreeAddress := testENRBootstrap - envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") - if envEnrTreeAddress != "" { - enrTreeAddress = envEnrTreeAddress - } - - wakuConfig := &wakuv2.Config{} - wakuConfig.Port = 0 - wakuConfig.EnablePeerExchangeClient = true - wakuConfig.LightClient = true - wakuConfig.EnableDiscV5 = false - wakuConfig.DiscV5BootstrapNodes = []string{enrTreeAddress} - wakuConfig.DiscoveryLimit = 20 - wakuConfig.ClusterID = 16 - wakuConfig.WakuNodes = []string{enrTreeAddress} - wakuConfig.TelemetryServerURL = client.serverURL - wakuConfig.TelemetrySendPeriodMs = 500 - w, err := wakuv2.New(nil, "", wakuConfig, nil, nil, nil, nil, nil) - require.NoError(t, err) - - client.Start(ctx) - w.SetStatusTelemetryClient(client) - - // Setting this forces the publish function to fail when sending a message - w.SkipPublishToTopic(true) - - err = w.Start() - require.NoError(t, err) - - msg := &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: testContentTopic, - Version: proto.Uint32(0), - Timestamp: proto.Int64(time.Now().Unix()), - } - - // This should result in a single request sent by the telemetry client - _, err = w.Send(wakuConfig.DefaultShardPubsubTopic, msg, nil) - require.NoError(t, err) - }) +func getGaugeVecValue(metric *prometheus.GaugeVec, labels ...string) float64 { + m := metric.WithLabelValues(labels...) + pb := &dto.Metric{} + err := m.(prometheus.Metric).Write(pb) + if err != nil { + return 0 + } + return pb.Gauge.GetValue() } -func TestRetryCache(t *testing.T) { - counter := 0 - var wg sync.WaitGroup - wg.Add(2) - - mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - t.Errorf("Expected 'POST' request, got '%s'", r.Method) - } - if r.URL.EscapedPath() != "/record-metrics" { - t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath()) - } - - // Check the request body is as expected - var received []TelemetryRequest - err := json.NewDecoder(r.Body).Decode(&received) - if err != nil { - t.Fatal(err) - } - - // Fail for the first request to make telemetry cache grow - if counter < 1 { - counter++ - w.WriteHeader(http.StatusInternalServerError) - wg.Done() - } else { - t.Log("Counter reached, responding with success") - if len(received) == 4 { - w.WriteHeader(http.StatusCreated) - responseBody := []map[string]interface{}{ - {"status": "created"}, - } - body, err := json.Marshal(responseBody) - if err != nil { - t.Fatalf("Failed to marshal response body: %v", err) - } - w.WriteHeader(http.StatusCreated) - _, err = w.Write(body) - if err != nil { - t.Fatalf("Failed to write response body: %v", err) - } - wg.Done() - } else { - t.Fatalf("Expected 4 metrics, got %d", len(received)-1) - } - } - })) - defer mockServer.Close() - - ctx := context.Background() - - client := createClient(t, mockServer.URL) - client.Start(ctx) - - for i := 0; i < 3; i++ { - sendEnvelope(ctx, client) +func getHistogramSampleCount(metric *prometheus.HistogramVec, labels ...string) uint64 { + m := metric.WithLabelValues(labels...) + pb := &dto.Metric{} + err := m.(prometheus.Metric).Write(pb) + if err != nil { + return 0 } + return pb.Histogram.GetSampleCount() +} - time.Sleep(110 * time.Millisecond) +func TestClient_PushReceivedMessages(t *testing.T) { + client := createTestClient(t) - require.Equal(t, 3, len(client.telemetryRetryCache)) + filter := transport.Filter{ + PubsubTopic: "test-pubsub", + ContentTopic: types.StringToTopic("test-content"), + ChatID: "test-chat", + } - sendEnvelope(ctx, client) + sshMessage := createTestMessage("test-pubsub", types.StringToTopic("test-content"), []byte("test-payload")) - wg.Wait() + receivedMessages := ReceivedMessages{ + Filter: filter, + SSHMessage: sshMessage, + } - time.Sleep(100 * time.Millisecond) + client.PushReceivedMessages(context.Background(), receivedMessages) - require.Equal(t, 0, len(client.telemetryRetryCache)) + // Verify MessagesReceivedTotal metric + value := getCounterValue(MessagesReceivedTotal, + filter.PubsubTopic, + filter.ContentTopic.String(), + filter.ChatID, + ) + require.Equal(t, float64(1), value) + + // Verify MessagesSizeBytes metric + count := getHistogramSampleCount(MessagesSizeBytes, + filter.PubsubTopic, + filter.ContentTopic.String(), + ) + require.Equal(t, uint64(1), count) } -func TestRetryCacheCleanup(t *testing.T) { - ctx := context.Background() +func TestClient_PushSentEnvelope(t *testing.T) { + client := createTestClient(t) - client := createClient(t, "") + msg := &pb.WakuMessage{ + Payload: []byte("test-payload"), + ContentTopic: "test-content", + } + envelope := v2protocol.NewEnvelope(msg, 0, "") - for i := 0; i < 6000; i++ { - go sendEnvelope(ctx, client) - telemetryRequest := <-client.telemetryCh - client.telemetryCache = append(client.telemetryCache, telemetryRequest) + sentEnvelope := wakuv2.SentEnvelope{ + Envelope: envelope, + PublishMethod: publish.LightPush, } - err := client.pushTelemetryRequest(client.telemetryCache) - // For this test case an error when pushing to the server is fine - require.Error(t, err) + client.PushSentEnvelope(context.Background(), sentEnvelope) - client.telemetryCache = nil - require.Equal(t, 6000, len(client.telemetryRetryCache)) + // Verify MessagesSentTotal metric + value := getCounterValue(MessagesSentTotal, + envelope.PubsubTopic(), + envelope.Message().ContentTopic, + sentEnvelope.PublishMethod.String(), + ) + require.Equal(t, float64(1), value) - go sendEnvelope(ctx, client) - telemetryRequest := <-client.telemetryCh - client.telemetryCache = append(client.telemetryCache, telemetryRequest) + // Verify MessagesSizeBytes metric + count := getHistogramSampleCount(MessagesSizeBytes, + envelope.PubsubTopic(), + envelope.Message().ContentTopic, + ) + require.Equal(t, uint64(1), count) +} - err = client.pushTelemetryRequest(client.telemetryCache) - require.Error(t, err) +func TestClient_PushPeerCount(t *testing.T) { + client := createTestClient(t) - telemetryRequests := make([]TelemetryRequest, len(client.telemetryCache)) - copy(telemetryRequests, client.telemetryCache) - client.telemetryCache = nil + // First update + client.PushPeerCount(context.Background(), 5) + value := getGaugeValue(ConnectedPeers) + require.Equal(t, float64(5), value) - err = client.pushTelemetryRequest(telemetryRequests) - require.Error(t, err) + // Second update within 1 second should not change the metric + client.PushPeerCount(context.Background(), 10) + value = getGaugeValue(ConnectedPeers) + require.Equal(t, float64(5), value) - require.Equal(t, 5001, len(client.telemetryRetryCache)) + // Wait for more than 1 second and update + time.Sleep(1100 * time.Millisecond) + client.PushPeerCount(context.Background(), 10) + value = getGaugeValue(ConnectedPeers) + require.Equal(t, float64(10), value) } -func setDefaultConfig(config *wakuv2.Config, lightMode bool) { - config.ClusterID = 16 - - if lightMode { - config.EnablePeerExchangeClient = true - config.LightClient = true - config.EnableDiscV5 = false - } else { - config.EnableDiscV5 = true - config.EnablePeerExchangeServer = true - config.LightClient = false - config.EnablePeerExchangeClient = false - } -} +func TestClient_PushPeerCountByOrigin(t *testing.T) { + client := createTestClient(t) -var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im" + peerCountByOrigin := map[wps.Origin]uint{ + wps.Discv5: 5, + wps.Static: 3, + wps.PeerExchange: 2, + } -func TestPeerCount(t *testing.T) { - // t.Skip("flaky test") + client.PushPeerCountByOrigin(context.Background(), peerCountByOrigin) - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { - found := slices.ContainsFunc(received, func(req TelemetryRequest) bool { - t.Log(req) - return req.TelemetryType == PeerCountMetric - }) - return found, false + // Verify metrics for each origin + for origin, expectedCount := range peerCountByOrigin { + value := getGaugeVecValue(PeersByOrigin, getOriginString(origin)) + require.Equal(t, float64(expectedCount), value) } - withMockServer(t, PeerCountMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - config := &wakuv2.Config{} - setDefaultConfig(config, false) - config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap} - config.DiscoveryLimit = 20 - config.TelemetryServerURL = client.serverURL - config.TelemetrySendPeriodMs = 1500 - config.TelemetryPeerCountSendPeriod = 1500 - w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil) - require.NoError(t, err) - - w.SetStatusTelemetryClient(client) - client.Start(ctx) - - require.NoError(t, w.Start()) - - err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) == 0 { - return errors.New("no peers discovered") - } - return nil - }) - - require.NoError(t, err) - - require.NotEqual(t, 0, len(w.Peers())) - }) } -func TestPeerId(t *testing.T) { - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { - var data map[string]interface{} +func TestClient_PushPeerCountByShard(t *testing.T) { + client := createTestClient(t) - err := json.Unmarshal(*received[0].TelemetryData, &data) - if err != nil { - return false, true - } - - _, ok := data["peerId"] - require.True(t, ok) - return ok, false + peerCountByShard := map[uint16]uint{ + 1: 5, + 2: 3, + 3: 2, } - withMockServer(t, SentEnvelopeMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - // Send the telemetry request - client.Start(ctx) - sendEnvelope(ctx, client) - - }) -} + client.PushPeerCountByShard(context.Background(), peerCountByShard) -func TestPeerCountByShard(t *testing.T) { - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { - found := slices.ContainsFunc(received, func(req TelemetryRequest) bool { - return req.TelemetryType == PeerCountByShardMetric - }) - return found, false + // Verify metrics for each shard + for shard, expectedCount := range peerCountByShard { + value := getGaugeVecValue(PeersByShard, strconv.FormatUint(uint64(shard), 10)) + require.Equal(t, float64(expectedCount), value) } - withMockServer(t, PeerCountByShardMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - config := &wakuv2.Config{} - setDefaultConfig(config, false) - config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap} - config.DiscoveryLimit = 20 - config.TelemetryServerURL = client.serverURL - config.TelemetryPeerCountSendPeriod = 1500 - config.TelemetrySendPeriodMs = 1500 - w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil) - require.NoError(t, err) - - w.SetStatusTelemetryClient(client) - client.Start(ctx) - - require.NoError(t, w.Start()) - - err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) == 0 { - return errors.New("no peers discovered") - } - return nil - }) - - require.NoError(t, err) - - require.NotEqual(t, 0, len(w.Peers())) - }) -} - -func TestPeerCountByOrigin(t *testing.T) { - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { - found := slices.ContainsFunc(received, func(req TelemetryRequest) bool { - return req.TelemetryType == PeerCountByOriginMetric - }) - return found, false - } - withMockServer(t, PeerCountByOriginMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - config := &wakuv2.Config{} - setDefaultConfig(config, false) - config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap} - config.DiscoveryLimit = 20 - config.TelemetryServerURL = client.serverURL - config.TelemetryPeerCountSendPeriod = 1500 - config.TelemetrySendPeriodMs = 1500 - w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil) - require.NoError(t, err) - - w.SetStatusTelemetryClient(client) - client.Start(ctx) - - require.NoError(t, w.Start()) - - err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) == 0 { - return errors.New("no peers discovered") - } - return nil - }) - - require.NoError(t, err) - - require.NotEqual(t, 0, len(w.Peers())) - }) -} - -type testCase struct { - name string - input interface{} - expectedType TelemetryType - expectedFields map[string]interface{} } -func runTestCase(t *testing.T, tc testCase) { - ctx := context.Background() - client := createClient(t, "") - - go client.processAndPushTelemetry(ctx, tc.input) - - telemetryRequest := <-client.telemetryCh - - require.Equal(t, tc.expectedType, telemetryRequest.TelemetryType, "Unexpected telemetry type") - - var telemetryData map[string]interface{} - err := json.Unmarshal(*telemetryRequest.TelemetryData, &telemetryData) - require.NoError(t, err, "Failed to unmarshal telemetry data") +func TestClient_PushErrorSendingEnvelope(t *testing.T) { + client := createTestClient(t) - for key, value := range tc.expectedFields { - require.Equal(t, value, telemetryData[key], "Unexpected value for %s", key) + msg := &pb.WakuMessage{ + Payload: []byte("test-payload"), + ContentTopic: "test-content", } + envelope := v2protocol.NewEnvelope(msg, 0, "") - require.Contains(t, telemetryData, "nodeName", "Missing nodeName in telemetry data") - require.Contains(t, telemetryData, "peerId", "Missing peerId in telemetry data") - require.Contains(t, telemetryData, "statusVersion", "Missing statusVersion in telemetry data") - require.Contains(t, telemetryData, "deviceType", "Missing deviceType in telemetry data") - require.Contains(t, telemetryData, "timestamp", "Missing timestamp in telemetry data") - - // Simulate pushing the telemetry request - client.telemetryCache = append(client.telemetryCache, telemetryRequest) - - err = client.pushTelemetryRequest(client.telemetryCache) - // For this test case, we expect an error when pushing to the server - require.Error(t, err) - - // Verify that the request is now in the retry cache - require.Equal(t, 1, len(client.telemetryRetryCache), "Expected one item in telemetry retry cache") -} - -func TestProcessMessageDeliveryConfirmed(t *testing.T) { - tc := testCase{ - name: "MessageDeliveryConfirmed", - input: MessageDeliveryConfirmed{ - MessageHash: "0x1234567890abcdef", - }, - expectedType: MessageDeliveryConfirmedMetric, - expectedFields: map[string]interface{}{ - "messageHash": "0x1234567890abcdef", + errorSendingEnvelope := wakuv2.ErrorSendingEnvelope{ + SentEnvelope: wakuv2.SentEnvelope{ + Envelope: envelope, + PublishMethod: publish.LightPush, }, + Error: errTest, } - runTestCase(t, tc) -} -func TestProcessMissedRelevantMessage(t *testing.T) { - now := time.Now() - message := common.NewReceivedMessage( - v2protocol.NewEnvelope( - &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: testContentTopic, - Version: proto.Uint32(0), - Timestamp: proto.Int64(now.Unix()), - }, 0, ""), - common.MissingMessageType, - ) - tc := testCase{ - name: "MissedRelevantMessage", - input: MissedRelevantMessage{ - ReceivedMessage: message, - }, - expectedType: MissedRelevantMessageMetric, - expectedFields: map[string]interface{}{ - "messageHash": message.Envelope.Hash().String(), - "pubsubTopic": "", - "contentTopic": "0x12345679", - }, - } - runTestCase(t, tc) -} + client.PushErrorSendingEnvelope(context.Background(), errorSendingEnvelope) -func TestProcessMissedMessage(t *testing.T) { - now := time.Now() - message := common.NewReceivedMessage( - v2protocol.NewEnvelope( - &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: testContentTopic, - Version: proto.Uint32(0), - Timestamp: proto.Int64(now.Unix()), - }, 0, ""), - common.MissingMessageType, + value := getCounterValue(MessageErrors, + "send_error", + envelope.PubsubTopic(), + envelope.Message().ContentTopic, ) - tc := testCase{ - name: "MissedMessage", - input: MissedMessage{ - Envelope: message.Envelope, - }, - expectedType: MissedMessageMetric, - expectedFields: map[string]interface{}{ - "messageHash": message.Envelope.Hash().String(), - "pubsubTopic": "", - "contentTopic": message.Envelope.Message().ContentTopic, - }, - } - runTestCase(t, tc) + require.Equal(t, float64(1), value) } -func TestProcessDialFailure(t *testing.T) { - tc := testCase{ - name: "DialFailure", - input: DialFailure{ - ErrorType: common.ErrorUnknown, - ErrorMsg: "test error message", - Protocols: "test-protocols", - }, - expectedType: DialFailureMetric, - expectedFields: map[string]interface{}{ - "errorType": float64(common.ErrorUnknown), - "errorMsg": "test error message", - "protocols": "test-protocols", - }, - } - runTestCase(t, tc) -} +func TestClient_UpdateEnvelopeProcessingError(t *testing.T) { + client := createTestClient(t) -func TestProcessSentMessageTotal(t *testing.T) { - tc := testCase{ - name: "SentMessageTotal", - input: SentMessageTotal{ - Size: uint32(1234), - }, - expectedType: SentMessageTotalMetric, - expectedFields: map[string]interface{}{ - "size": float64(1234), - }, - } - runTestCase(t, tc) + sshMessage := createTestMessage("test-pubsub", types.StringToTopic("test-content"), []byte("test-payload")) + + client.UpdateEnvelopeProcessingError(sshMessage, errTest) + + value := getCounterValue(MessageErrors, + "processing_error", + sshMessage.PubsubTopic, + sshMessage.Topic.String(), + ) + require.Equal(t, float64(1), value) } diff --git a/telemetry/metrics.go b/telemetry/metrics.go new file mode 100644 index 00000000000..10ab8750f50 --- /dev/null +++ b/telemetry/metrics.go @@ -0,0 +1,193 @@ +package telemetry + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + // Message Metrics + + // MessagesSentTotal tracks the total number of messages sent by this node + MessagesSentTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_messages_sent_total", + Help: "Total number of messages sent by this node", + }, + []string{"pubsub_topic", "content_topic", "publish_method"}, + ) + + // MessagesReceivedTotal tracks the total number of messages received by this node + MessagesReceivedTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_messages_received_total", + Help: "Total number of messages received by this node", + }, + []string{"pubsub_topic", "content_topic", "chat_id"}, + ) + + // MessagesSizeBytes tracks the size distribution of messages + MessagesSizeBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "waku_message_size_bytes", + Help: "Size of messages in bytes", + Buckets: prometheus.ExponentialBuckets(64, 2, 10), // From 64B to ~32KB + }, + []string{"pubsub_topic", "content_topic"}, + ) + + // MessageErrors tracks various message-related errors + MessageErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_message_errors_total", + Help: "Total number of message errors by type", + }, + []string{"error_type", "pubsub_topic", "content_topic"}, + ) + + // MessageDeliveryConfirmations tracks successful message deliveries + MessageDeliveryConfirmations = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_message_delivery_confirmations_total", + Help: "Total number of message delivery confirmations", + }, + []string{"pubsub_topic", "content_topic"}, + ) + + // Peer Metrics + + // ConnectedPeers tracks the current number of connected peers + ConnectedPeers = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "waku_connected_peers", + Help: "Current number of connected peers", + }, + ) + + // PeersByOrigin tracks the number of peers by discovery origin + PeersByOrigin = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "waku_peers_by_origin", + Help: "Number of peers by discovery origin", + }, + []string{"origin"}, + ) + + // PeersByShard tracks the number of peers by shard + PeersByShard = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "waku_peers_by_shard", + Help: "Number of peers by shard", + }, + []string{"shard"}, + ) + + // PeerConnectionFailures tracks peer connection failures + PeerConnectionFailures = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_peer_connection_failures_total", + Help: "Total number of peer connection failures", + }, + []string{"peer_id"}, + ) + + // PeerDialFailures tracks peer dial failures by error type + PeerDialFailures = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_peer_dial_failures_total", + Help: "Total number of peer dial failures by error type", + }, + []string{"error_type", "protocols"}, + ) + + // Store Metrics + + // StoreQuerySuccesses tracks successful store queries + StoreQuerySuccesses = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_store_query_successes_total", + Help: "Total number of successful store queries", + }, + []string{"message_hash"}, + ) + + // StoreQueryFailures tracks failed store queries + StoreQueryFailures = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_store_query_failures_total", + Help: "Total number of failed store queries", + }, + []string{"message_hash"}, + ) + + // MissedMessages tracks messages that were missed (detected by store query) + MissedMessages = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_missed_messages_total", + Help: "Total number of missed messages detected by store query", + }, + []string{"pubsub_topic", "content_topic"}, + ) + + // Protocol Metrics + + // ProtocolBandwidthBytes tracks bandwidth usage by protocol + ProtocolBandwidthBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_protocol_bandwidth_bytes_total", + Help: "Total bandwidth usage in bytes by protocol and direction", + }, + []string{"protocol", "direction"}, // direction: in/out + ) + + // LightPushMessages tracks light push protocol usage + LightPushMessages = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "waku_lightpush_messages_total", + Help: "Total number of messages sent via light push", + }, + ) + + // LightPushErrors tracks light push protocol errors + LightPushErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "waku_lightpush_errors_total", + Help: "Total number of light push errors", + }, + ) +) + +// Common label values +const ( + DirectionIn = "in" + DirectionOut = "out" +) + +// RegisterMetrics registers all metrics with the provided registry +func RegisterMetrics(reg prometheus.Registerer) error { + collectors := []prometheus.Collector{ + MessagesSentTotal, + MessagesReceivedTotal, + MessagesSizeBytes, + MessageErrors, + MessageDeliveryConfirmations, + ConnectedPeers, + PeersByOrigin, + PeersByShard, + PeerConnectionFailures, + PeerDialFailures, + StoreQuerySuccesses, + StoreQueryFailures, + MissedMessages, + ProtocolBandwidthBytes, + LightPushMessages, + LightPushErrors, + } + + for _, collector := range collectors { + if err := reg.Register(collector); err != nil { + return err + } + } + + return nil +} diff --git a/telemetry/prometheus.go b/telemetry/prometheus.go deleted file mode 100644 index 79f7168433f..00000000000 --- a/telemetry/prometheus.go +++ /dev/null @@ -1,139 +0,0 @@ -package telemetry - -import ( - "context" - "encoding/json" - "log" - "time" - - "github.com/prometheus/client_golang/prometheus" - prom_model "github.com/prometheus/client_model/go" -) - -type MetricType int - -const ( - _ MetricType = iota - CounterType - GaugeType -) - -type TelemetryRecord struct { - NodeName string `json:"nodeName"` - PeerID string `json:"peerId"` - StatusVersion string `json:"statusVersion"` - DeviceType string `json:"deviceType"` -} - -type ProcessTelemetryRequest func(ctx context.Context, data interface{}) - -type MetricPayload struct { - Name string - Value []*prom_model.Metric -} - -type Metric struct { - typ MetricType - labels map[string]string -} - -type PrometheusMetrics struct { - metrics map[string]Metric - process ProcessTelemetryRequest - telemetryRecord TelemetryRecord -} - -func NewPrometheusMetrics(process ProcessTelemetryRequest, tc TelemetryRecord) *PrometheusMetrics { - return &PrometheusMetrics{ - metrics: make(map[string]Metric), - process: process, - telemetryRecord: tc, - } -} - -func (pm *PrometheusMetrics) Register(name string, typ MetricType, labels prometheus.Labels) { - pm.metrics[name] = Metric{typ, labels} -} - -func (pm *PrometheusMetrics) Snapshot() { - gatherer := prometheus.DefaultGatherer - metrics, err := gatherer.Gather() - if err != nil { - log.Fatalf("Failed to gather metrics: %v", err) - } - - for _, mf := range metrics { - metric, ok := pm.metrics[*mf.Name] - if !ok { - continue - } - - metricFamilyValue := mf.GetMetric() - - if len(metricFamilyValue) == 0 { - continue - } - - metricValue := []*prom_model.Metric{} - - if metric.labels != nil { //filter out metrics based on labels - for _, m := range mf.GetMetric() { - - matchCnt := len(metric.labels) - - for name, value := range metric.labels { - for _, label := range m.GetLabel() { - if name == *label.Name && value == *label.Value { - matchCnt-- - } - } - } - - if matchCnt > 0 { - continue - } - - metricValue = append(metricValue, m) - - } - } else { - metricValue = metricFamilyValue - } - - if len(metricValue) == 0 { - continue - } - - p := MetricPayload{Name: *mf.Name, Value: metricValue} - - pm.ToTelemetryRequest(p) - } - -} - -func (pm *PrometheusMetrics) ToTelemetryRequest(p MetricPayload) error { - postBody := map[string]interface{}{ - "value": p.Value, - "name": p.Name, - "nodeName": pm.telemetryRecord.NodeName, - "deviceType": pm.telemetryRecord.DeviceType, - "peerId": pm.telemetryRecord.PeerID, - "statusVersion": pm.telemetryRecord.StatusVersion, - "timestamp": time.Now().Unix(), - } - - telemtryData, err := json.Marshal(postBody) - if err != nil { - return err - } - - rawData := json.RawMessage(telemtryData) - - wrap := PrometheusMetricWrapper{ - Typ: "PrometheusMetric", - Data: &rawData, - } - - pm.process(context.Background(), wrap) - return nil -}