Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull ejector out of dataapi #608

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/dataapi"
"github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus"
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
"github.com/Layr-Labs/eigenda/operators/ejector"
walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/signerv2"
Expand Down Expand Up @@ -124,7 +125,7 @@ func RunDataApi(ctx *cli.Context) error {
subgraphClient,
tx,
chainState,
dataapi.NewEjector(wallet, client, logger, tx, metrics, config.TxnTimeout, config.NonsigningRateThreshold),
ejector.NewEjector(wallet, client, logger, tx, metrics.EjectorMetrics, config.TxnTimeout, config.NonsigningRateThreshold),
logger,
metrics,
nil,
Expand Down
101 changes: 8 additions & 93 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (

"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/Layr-Labs/eigenda/operators/ejector"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
)

type MetricsConfig struct {
Expand All @@ -23,14 +23,9 @@ type MetricsConfig struct {
type Metrics struct {
registry *prometheus.Registry

NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec

PeriodicEjectionRequests *prometheus.CounterVec
UrgentEjectionRequests *prometheus.CounterVec
OperatorsToEject *prometheus.CounterVec
StakeShareToEject *prometheus.GaugeVec
EjectionGasUsed prometheus.Gauge
NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec
EjectorMetrics *ejector.Metrics

httpPort string
logger logging.Logger
Expand Down Expand Up @@ -60,58 +55,10 @@ func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string,
},
[]string{"method"},
),
// PeriodicEjectionRequests is a more detailed metric than NumRequests, specifically for
// tracking the ejection calls that are periodically initiated according to the SLA
// evaluation time window.
PeriodicEjectionRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "periodic_ejection_requests_total",
Help: "the total number of periodic ejection requests",
},
[]string{"status"},
),
// UrgentEjectionRequests is a more detailed metric than NumRequests, specifically for
// tracking the ejection calls that are urgently initiated due to bad network health
// condition.
UrgentEjectionRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "urgent_ejection_requests_total",
Help: "the total number of urgent ejection requests",
},
[]string{"status"},
),
// The number of operators requested to eject. Note this may be different than the
// actual number of operators ejected as EjectionManager contract may perform rate
// limiting.
OperatorsToEject: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "operators_to_eject",
Help: "the total number of operators requested to eject",
}, []string{"quorum"},
),
// The total stake share requested to eject. Note this may be different than the
// actual stake share ejected as EjectionManager contract may perform rate limiting.
StakeShareToEject: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "stake_share_to_eject",
Help: "the total stake share requested to eject",
}, []string{"quorum"},
),
// The gas used by EjectionManager contract for operator ejection.
EjectionGasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "ejection_gas_used",
Help: "Gas used for operator ejection",
},
),
registry: reg,
httpPort: httpPort,
logger: logger.With("component", "DataAPIMetrics"),
EjectorMetrics: ejector.NewMetrics(reg, logger),
registry: reg,
httpPort: httpPort,
logger: logger.With("component", "DataAPIMetrics"),
}
return metrics
}
Expand All @@ -137,38 +84,6 @@ func (g *Metrics) IncrementFailedRequestNum(method string) {
}).Inc()
}

func (g *Metrics) IncrementEjectionRequest(mode string, status codes.Code) {
switch mode {
case "periodic":
g.PeriodicEjectionRequests.With(prometheus.Labels{
"status": status.String(),
}).Inc()
case "urgent":
g.UrgentEjectionRequests.With(prometheus.Labels{
"status": status.String(),
}).Inc()
}
}

func (g *Metrics) UpdateRequestedOperatorMetric(numOperatorsByQuorum map[uint8]int, stakeShareByQuorum map[uint8]float64) {
for q, count := range numOperatorsByQuorum {
for i := 0; i < count; i++ {
g.OperatorsToEject.With(prometheus.Labels{
"quorum": fmt.Sprintf("%d", q),
}).Inc()
}
}
for q, stakeShare := range stakeShareByQuorum {
g.StakeShareToEject.With(prometheus.Labels{
"quorum": fmt.Sprintf("%d", q),
}).Set(stakeShare)
}
}

func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) {
g.EjectionGasUsed.Set(float64(gasUsed))
}

// IncrementNotFoundRequestNum increments the number of not found requests
func (g *Metrics) IncrementNotFoundRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
Expand Down
34 changes: 19 additions & 15 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/operators/ejector"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/Layr-Labs/eigenda/disperser"
Expand Down Expand Up @@ -94,10 +94,6 @@ type (
Timestamp uint64 `json:"timestamp"`
}

EjectionResponse struct {
TransactionHash string `json:"transaction_hash"`
}

Meta struct {
Size int `json:"size"`
}
Expand Down Expand Up @@ -170,7 +166,7 @@ type (
subgraphClient SubgraphClient
transactor core.Transactor
chainState core.ChainState
ejector *Ejector
ejector *ejector.Ejector
ejectionToken string

metrics *Metrics
Expand All @@ -189,7 +185,7 @@ func NewServer(
subgraphClient SubgraphClient,
transactor core.Transactor,
chainState core.ChainState,
ejector *Ejector,
ejector *ejector.Ejector,
logger logging.Logger,
metrics *Metrics,
grpcConn GRPCConn,
Expand Down Expand Up @@ -344,9 +340,9 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) {
return
}

mode := "periodic"
if c.Query("mode") != "" {
mode = c.Query("mode")
mode := ejector.PeriodicMode
if c.Query("mode") == "urgent" {
mode = ejector.UrgentMode
}

endTime := time.Now()
Expand All @@ -355,7 +351,6 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) {
endTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("end"))
if err != nil {
s.metrics.IncrementFailedRequestNum("EjectOperators")
s.metrics.IncrementEjectionRequest(mode, codes.InvalidArgument)
errorResponse(c, err)
return
}
Expand All @@ -367,18 +362,27 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) {
}

nonSigningRate, err := s.getOperatorNonsigningRate(c.Request.Context(), endTime.Unix()-interval, endTime.Unix(), true)
var ejectionResponse *EjectionResponse
var ejectionResponse *ejector.EjectionResponse
if err == nil {
ejectionResponse, err = s.ejector.Eject(c.Request.Context(), nonSigningRate)
nonSigningMetrics := make([]*ejector.NonSignerMetric, 0)
for _, metric := range nonSigningRate.Data {
nonSigningMetrics = append(nonSigningMetrics, &ejector.NonSignerMetric{
OperatorId: metric.OperatorId,
OperatorAddress: metric.OperatorAddress,
QuorumId: metric.QuorumId,
TotalUnsignedBatches: metric.TotalUnsignedBatches,
Percentage: metric.Percentage,
StakePercentage: metric.StakePercentage,
})
}
ejectionResponse, err = s.ejector.Eject(c.Request.Context(), nonSigningMetrics, mode)
}
if err != nil {
s.metrics.IncrementFailedRequestNum("EjectOperators")
s.metrics.IncrementEjectionRequest(mode, codes.Internal)
errorResponse(c, err)
return
}
s.metrics.IncrementSuccessfulRequestNum("EjectOperators")
s.metrics.IncrementEjectionRequest(mode, codes.OK)
c.JSON(http.StatusOK, ejectionResponse)
}

Expand Down
7 changes: 4 additions & 3 deletions disperser/dataapi/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
subgraphmock "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph/mock"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/operators/ejector"
sdkmock "github.com/Layr-Labs/eigensdk-go/chainio/clients/mocks"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
Expand Down Expand Up @@ -386,7 +387,7 @@ func TestEjectOperatorHandler(t *testing.T) {
data, err := io.ReadAll(res.Body)
assert.NoError(t, err)

var response dataapi.EjectionResponse
var response ejector.EjectionResponse
err = json.Unmarshal(data, &response)
assert.NoError(t, err)
assert.NotNil(t, response)
Expand Down Expand Up @@ -455,14 +456,14 @@ func TestFetchUnsignedBatchesHandler(t *testing.T) {
type ejectorComponents struct {
wallet *sdkmock.MockWallet
ethClient *commonmock.MockEthClient
ejector *dataapi.Ejector
ejector *ejector.Ejector
}

func getEjector(t *testing.T) *ejectorComponents {
ctrl := gomock.NewController(t)
w := sdkmock.NewMockWallet(ctrl)
ethClient := &commonmock.MockEthClient{}
ejector := dataapi.NewEjector(w, ethClient, mockLogger, mockTx, metrics, 100*time.Millisecond, -1)
ejector := ejector.NewEjector(w, ethClient, mockLogger, mockTx, metrics.EjectorMetrics, 100*time.Millisecond, -1)
return &ejectorComponents{
wallet: w,
ethClient: ethClient,
Expand Down
Loading
Loading