Skip to content

Commit

Permalink
Fix thread busy metrics in llo mercury transmitter
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 12, 2024
1 parent 3090b59 commit d6d07d7
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 43 deletions.
59 changes: 48 additions & 11 deletions core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ var (
},
[]string{"donID", "serverURL", "code"},
)
promTransmitConcurrentTransmitGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_transmit_gauge",
Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
},
[]string{"donID", "serverURL"},
)
promTransmitConcurrentDeleteGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "concurrent_delete_gauge",
Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
},
[]string{"donID", "serverURL"},
)
)

type ReportPacker interface {
Expand All @@ -87,12 +103,14 @@ type server struct {
evmPremiumLegacyPacker ReportPacker
jsonPacker ReportPacker

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
transmitConcurrentTransmitGauge prometheus.Gauge
transmitConcurrentDeleteGauge prometheus.Gauge

transmitThreadBusyCount atomic.Int32
deleteThreadBusyCount atomic.Int32
Expand Down Expand Up @@ -130,6 +148,8 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client
promTransmitQueueDeleteErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitQueueInsertErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentTransmitGauge.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentDeleteGauge.WithLabelValues(donIDStr, serverURL),
atomic.Int32{},
atomic.Int32{},
}
Expand Down Expand Up @@ -161,7 +181,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
select {
case hash := <-s.deleteQueue:
for {
s.deleteThreadBusyCount.Add(1)
s.deleteThreadBusyCountInc()
if err := s.pm.orm.Delete(ctx, [][32]byte{hash}); err != nil {
s.lggr.Errorw("Failed to delete transmission record", "err", err, "transmissionHash", hash)
s.transmitQueueDeleteErrorCount.Inc()
Expand All @@ -170,7 +190,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
// Wait a backoff duration before trying to delete again
continue
case <-stopCh:
s.deleteThreadBusyCount.Add(-1)
s.deleteThreadBusyCountDec()
// abort and return immediately on stop even if items remain in queue
return
}
Expand All @@ -179,14 +199,31 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup
}
// success
b.Reset()
s.deleteThreadBusyCount.Add(-1)
s.deleteThreadBusyCountDec()
case <-stopCh:
// abort and return immediately on stop even if items remain in queue
return
}
}
}

func (s *server) transmitThreadBusyCountInc() {
val := s.transmitThreadBusyCount.Add(1)
s.transmitConcurrentTransmitGauge.Set(float64(val))
}
func (s *server) transmitThreadBusyCountDec() {
val := s.transmitThreadBusyCount.Add(-1)
s.transmitConcurrentTransmitGauge.Set(float64(val))
}
func (s *server) deleteThreadBusyCountInc() {
val := s.deleteThreadBusyCount.Add(1)
s.transmitConcurrentDeleteGauge.Set(float64(val))
}
func (s *server) deleteThreadBusyCountDec() {
val := s.deleteThreadBusyCount.Add(-1)
s.transmitConcurrentDeleteGauge.Set(float64(val))
}

func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donIDStr string) {
defer wg.Done()
// Exponential backoff with very short retry interval (since latency is a priority)
Expand All @@ -208,8 +245,8 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI
return false
}

s.transmitThreadBusyCount.Add(1)
defer s.transmitThreadBusyCount.Add(-1)
s.transmitThreadBusyCountInc()
defer s.transmitThreadBusyCountDec()

req, res, err := func(ctx context.Context) (*pb.TransmitRequest, *pb.TransmitResponse, error) {
ctx, cancelFn := context.WithTimeout(ctx, utils.WithJitter(s.transmitTimeout))
Expand Down
33 changes: 1 addition & 32 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ type transmitter struct {
orm ORM
servers map[string]*server
registerer prometheus.Registerer
collectors []prometheus.Collector

donID uint32
fromAccount string
Expand Down Expand Up @@ -194,31 +193,6 @@ func (mt *transmitter) Start(ctx context.Context) (err error) {
go s.runDeleteQueueLoop(mt.stopCh, mt.wg)
go s.runQueueLoop(mt.stopCh, mt.wg, donIDStr)
}
// mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc(
// prometheus.GaugeOpts{
// Namespace: "llo",
// Subsystem: "mercurytransmitter",
// Name: "concurrent_transmit_gauge",
// Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
// ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentTransmits": strconv.FormatInt(int64(nThreads), 10)},
// }, func() float64 {
// return float64(s.transmitThreadBusyCount.Load())
// }))
// mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc(
// prometheus.GaugeOpts{
// Namespace: "llo",
// Subsystem: "mercurytransmitter",
// Name: "concurrent_delete_gauge",
// Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.",
// ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentDeletes": strconv.FormatInt(int64(nThreads), 10)},
// }, func() float64 {
// return float64(s.deleteThreadBusyCount.Load())
// }))
// for i, c := range mt.collectors {
// if err := mt.registerer.Register(c); err != nil {
// return fmt.Errorf("failed to register prometheus collector %d: %w", i, err)
// }
// }
}
if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil {
return err
Expand Down Expand Up @@ -250,12 +224,7 @@ func (mt *transmitter) Close() error {
closers = append(closers, s.pm)
closers = append(closers, s.c)
}
err := services.CloseAll(closers...)
// Unregister all the gauge funcs
for _, c := range mt.collectors {
mt.registerer.Unregister(c)
}
return err
return services.CloseAll(closers...)
})
}

Expand Down

0 comments on commit d6d07d7

Please sign in to comment.