Skip to content

Commit

Permalink
Metrics framework churner (#934)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Nov 27, 2024
1 parent b3a90d1 commit 53fd80d
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 72 deletions.
4 changes: 4 additions & 0 deletions metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# EigenDA Metrics Documentation

- [churner](operators/churner/churner-metrics.md)

33 changes: 33 additions & 0 deletions operators/churner/churner-metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Metrics Documentation for namespace 'eigenda_churner'

This documentation was automatically generated at time `2024-11-26T14:29:13-06:00`

There are a total of `2` registered metrics.

---

## latency_ms

latency summary in milliseconds

| | |
|---|---|
| **Name** | `latency` |
| **Unit** | `ms` |
| **Labels** | `method` |
| **Type** | `latency` |
| **Quantiles** | `0.500`, `0.900`, `0.950`, `0.990` |
| **Fully Qualified Name** | `eigenda_churner_latency_ms` |
---

## request_count

the number of requests

| | |
|---|---|
| **Name** | `request` |
| **Unit** | `count` |
| **Labels** | `status`, `method`, `reason` |
| **Type** | `counter` |
| **Fully Qualified Name** | `eigenda_churner_request_count` |
3 changes: 2 additions & 1 deletion operators/churner/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func TestProcessChurnRequest(t *testing.T) {
NumRetries: numRetries,
},
}
metrics := churner.NewMetrics("9001", logger)
metrics, err := churner.NewMetrics(9001, logger)
assert.NoError(t, err)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
assert.NoError(t, err)
assert.NotNil(t, cn)
Expand Down
5 changes: 4 additions & 1 deletion operators/churner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func run(ctx *cli.Context) error {
logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint)
indexer := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger)

metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger)
metrics, err := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger)
if err != nil {
log.Fatalf("failed to create metrics: %v", err)
}

cn, err := churner.NewChurner(config, indexer, tx, logger, metrics)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion operators/churner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
PerPublicKeyRateLimit: ctx.GlobalDuration(flags.PerPublicKeyRateLimit.Name),
ChurnApprovalInterval: ctx.GlobalDuration(flags.ChurnApprovalInterval.Name),
MetricsConfig: MetricsConfig{
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
HTTPPort: ctx.GlobalInt(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name),
},
}, nil
Expand Down
4 changes: 2 additions & 2 deletions operators/churner/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ var (
EnvVar: common.PrefixEnvVar(envPrefix, "ENABLE_METRICS"),
}
/* Optional Flags*/
MetricsHTTPPort = cli.StringFlag{
MetricsHTTPPort = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"),
Usage: "the http port which the metrics prometheus server is listening",
Required: false,
Value: "9100",
Value: 9100,
EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"),
}
ChurnApprovalInterval = cli.DurationFlag{
Expand Down
24 changes: 24 additions & 0 deletions operators/churner/mdoc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/operators/churner"
)

// main generates documentation for churner metrics.
func main() {
logger, err := common.NewLogger(common.DefaultLoggerConfig())
if err != nil {
panic(err)
}

metrics, err := churner.NewMetrics(0, logger)
if err != nil {
panic(err)
}

err = metrics.WriteMetricsDocumentation()
if err != nil {
panic(err)
}
}
124 changes: 62 additions & 62 deletions operators/churner/metrics.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package churner

import (
"context"
"fmt"
"net/http"
"github.com/Layr-Labs/eigenda/common/metrics"
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
)

Expand All @@ -28,7 +25,7 @@ const (
)

// Note: statusCodeMap must be maintained in sync with failure reason constants.
var statusCodeMap map[FailReason]string = map[FailReason]string{
var statusCodeMap = map[FailReason]string{
FailReasonRateLimitExceeded: codes.ResourceExhausted.String(),
FailReasonInsufficientStakeToRegister: codes.InvalidArgument.String(),
FailReasonInsufficientStakeToChurn: codes.InvalidArgument.String(),
Expand All @@ -40,63 +37,80 @@ var statusCodeMap map[FailReason]string = map[FailReason]string{
}

type MetricsConfig struct {
HTTPPort string
HTTPPort int
EnableMetrics bool
}

type Metrics struct {
registry *prometheus.Registry
metricsServer metrics.Metrics

NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec
numRequests metrics.CountMetric
latency metrics.LatencyMetric

httpPort string
logger logging.Logger
logger logging.Logger
}

func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
namespace := "eigenda_churner"
type latencyLabel struct {
method string
}

type numRequestsLabel struct {
status string
method string
reason string
}

func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) {
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

metrics := &Metrics{
NumRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "requests",
Help: "the number of requests",
},
[]string{"status", "reason", "method"},
),
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "latency_ms",
Help: "latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"method"},
),
registry: reg,
httpPort: httpPort,
logger: logger.With("component", "ChurnerMetrics"),
metricsServer := metrics.NewMetrics(logger, &metrics.Config{
Namespace: "eigenda_churner",
HTTPPort: httpPort,
})

numRequests, err := metricsServer.NewCountMetric(
"request",
"the number of requests",
numRequestsLabel{})
if err != nil {
return nil, err
}
return metrics

latency, err := metricsServer.NewLatencyMetric(
"latency",
"latency summary in milliseconds",
latencyLabel{},
&metrics.Quantile{Quantile: 0.5, Error: 0.05},
&metrics.Quantile{Quantile: 0.9, Error: 0.01},
&metrics.Quantile{Quantile: 0.95, Error: 0.01},
&metrics.Quantile{Quantile: 0.99, Error: 0.001})
if err != nil {
return nil, err
}

return &Metrics{
metricsServer: metricsServer,
numRequests: numRequests,
latency: latency,
logger: logger.With("component", "ChurnerMetrics"),
}, nil
}

// ObserveLatency observes the latency of a stage in 'stage
func (g *Metrics) ObserveLatency(method string, latencyMs float64) {
g.Latency.WithLabelValues(method).Observe(latencyMs)
// WriteMetricsDocumentation writes the metrics for the churner to a markdown file.
func (g *Metrics) WriteMetricsDocumentation() error {
return g.metricsServer.WriteMetricsDocumentation("operators/churner/churner-metrics.md")
}

// ObserveLatency observes the latency of a stage
func (g *Metrics) ObserveLatency(method string, latency time.Duration) {
g.latency.ReportLatency(latency, latencyLabel{method: method})
}

// IncrementSuccessfulRequestNum increments the number of successful requests
func (g *Metrics) IncrementSuccessfulRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
"status": "success",
"method": method,
"reason": "",
}).Inc()
g.numRequests.Increment(numRequestsLabel{status: "success", method: method})
}

// IncrementFailedRequestNum increments the number of failed requests
Expand All @@ -108,25 +122,11 @@ func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) {
// handle a negligence of mapping from failure reason to status code.
code = codes.Internal.String()
}
g.NumRequests.With(prometheus.Labels{
"status": code,
"reason": string(reason),
"method": method,
}).Inc()

g.numRequests.Increment(numRequestsLabel{status: code, reason: string(reason), method: method})
}

// Start starts the metrics server
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
addr := fmt.Sprintf(":%s", g.httpPort)
go func() {
log := g.logger
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(
g.registry,
promhttp.HandlerOpts{},
))
err := http.ListenAndServe(addr, mux)
log.Error("Prometheus server failed", "err", err)
}()
func (g *Metrics) Start() error {
return g.metricsServer.Start()
}
10 changes: 7 additions & 3 deletions operators/churner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ func NewServer(
func (s *Server) Start(metricsConfig MetricsConfig) error {
// Enable Metrics Block
if metricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%s", metricsConfig.HTTPPort)
s.metrics.Start(context.Background())
httpSocket := fmt.Sprintf(":%d", metricsConfig.HTTPPort)
err := s.metrics.Start()
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}

s.logger.Info("Enabled metrics for Churner", "socket", httpSocket)
}
return nil
Expand All @@ -62,7 +66,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl
}

timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("Churn", f*1000) // make milliseconds
s.metrics.ObserveLatency("Churn", time.Duration(f*float64(time.Second)))
}))
defer timer.ObserveDuration()
s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds())
Expand Down
3 changes: 2 additions & 1 deletion operators/churner/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ func newTestServer(t *testing.T) *churner.Server {

setupMockWriter()

metrics := churner.NewMetrics("9001", logger)
metrics, err := churner.NewMetrics(9001, logger)
assert.NoError(t, err)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
if err != nil {
log.Fatalln("cannot create churner", err)
Expand Down
3 changes: 2 additions & 1 deletion operators/churner/tests/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ func newTestServer(t *testing.T) *churner.Server {
)
assert.NoError(t, err)

metrics := churner.NewMetrics("9001", logger)
metrics, err := churner.NewMetrics(9001, logger)
assert.NoError(t, err)
cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger, metrics)
assert.NoError(t, err)

Expand Down

0 comments on commit 53fd80d

Please sign in to comment.