diff --git a/common/ratelimit.go b/common/ratelimit.go index 5d4950019e..64534ec6fb 100644 --- a/common/ratelimit.go +++ b/common/ratelimit.go @@ -15,17 +15,22 @@ import ( // ID is the authenticated Account ID. For retrieval requests, the requester ID will be the requester's IP address. type RequesterID = string +// RequesterName is the friendly name of the party making the request. In the case +// of a rollup making a dispersal request, the RequesterName is the name of the rollup. +type RequesterName = string + type RequestParams struct { - RequesterID RequesterID - BlobSize uint - Rate RateParam - Info interface{} + RequesterID RequesterID + RequesterName RequesterName + BlobSize uint + Rate RateParam + Info interface{} } type RateLimiter interface { // AllowRequest checks whether the request should be allowed. If the request is allowed, the function returns true. // If the request is not allowed, the function returns false and the RequestParams of the request that was not allowed. - // In order to for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed. + // In order for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed. // Each RequestParams object represents a single request. Each request is subjected to the same GlobalRateParams, but the // individual parameters of the request can differ. // @@ -37,7 +42,7 @@ type RateLimiter interface { type GlobalRateParams struct { // BucketSizes are the time scales at which the rate limit is enforced. - // For each time scale, the rate limiter will make sure that the give rate (possibly subject to a relaxation given + // For each time scale, the rate limiter will make sure that the given rate (possibly subject to a relaxation given // by one of the Multipliers) is observed when the request bandwidth is averaged at this time scale. // In terms of implementation, the rate limiter uses a set of "time buckets". A time bucket, i, is filled to a maximum of // `BucketSizes[i]` at a rate of 1, and emptied by an amount equal to `(size of request)/RateParam` each time a diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 08387cf446..f522f9b64a 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -2,10 +2,13 @@ package ratelimit import ( "context" + "strconv" "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) type BucketStore = common.KVStore[common.RateBucketParams] @@ -15,13 +18,20 @@ type rateLimiter struct { bucketStore BucketStore logger logging.Logger + + // Prometheus metrics + bucketLevels *prometheus.GaugeVec } -func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { +func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { return &rateLimiter{ globalRateParams: rateParams, bucketStore: bucketStore, logger: logger.With("component", "RateLimiter"), + bucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rate_limiter_bucket_levels", + Help: "Current level of each bucket for rate limiting", + }, []string{"requester_id", "requester_name", "bucket_index"}), } } @@ -109,7 +119,18 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction) allowed = allowed && bucketParams.BucketLevels[i] > 0 - d.logger.Debug("Bucket level", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) + d.logger.Debug("Bucket level updated", "key", params.RequesterID, "name", params.RequesterName, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) + + // Update metrics only if the requester name is provided. We're making + // an assumption that the requester name is only provided for authenticated + // requests so it should limit the cardinality of the requester_id label. + if params.RequesterName != "" { + d.bucketLevels.With(prometheus.Labels{ + "requester_id": params.RequesterID, + "requester_name": params.RequesterName, + "bucket_index": strconv.Itoa(i), + }).Set(float64(bucketParams.BucketLevels[i])) + } } return allowed, bucketParams diff --git a/common/ratelimit/ratelimit_test.go b/common/ratelimit/ratelimit_test.go index 0555f2e748..969d698dbf 100644 --- a/common/ratelimit/ratelimit_test.go +++ b/common/ratelimit/ratelimit_test.go @@ -9,6 +9,7 @@ import ( "github.com/Layr-Labs/eigenda/common/ratelimit" "github.com/Layr-Labs/eigenda/common/store" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" ) @@ -25,7 +26,7 @@ func makeTestRatelimiter() (common.RateLimiter, error) { return nil, err } - ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logging.NewNoopLogger()) + ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logging.NewNoopLogger()) return ratelimiter, nil diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 640758c3a7..e604b2a3ae 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -355,11 +355,14 @@ func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, qu rates.BlobRate = rateInfo.BlobRate } + if len(rateInfo.Name) > 0 { + rates.Name = rateInfo.Name + } + break } return rates, key, nil - } // Enum of rateTypes for the limiterInfo struct @@ -446,6 +449,9 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context s.metrics.HandleInternalFailureRpcRequest(apiMethodName) return api.NewInternalError(err.Error()) } + + // Note: There's an implicit assumption that an empty name means the account + // is not in the allow list. requesterName = accountRates.Name // Update the quorum rate @@ -458,9 +464,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context // System Level key := fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemThroughputType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: encodedSize, - Rate: globalRates.TotalUnauthThroughput, + RequesterID: key, + RequesterName: systemAccountKey, + BlobSize: encodedSize, + Rate: globalRates.TotalUnauthThroughput, Info: limiterInfo{ RateType: SystemThroughputType, QuorumID: param.QuorumID, @@ -469,9 +476,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context key = fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemBlobRateType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: blobRateMultiplier, - Rate: globalRates.TotalUnauthBlobRate, + RequesterID: key, + RequesterName: systemAccountKey, + BlobSize: blobRateMultiplier, + Rate: globalRates.TotalUnauthBlobRate, Info: limiterInfo{ RateType: SystemBlobRateType, QuorumID: param.QuorumID, @@ -481,9 +489,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context // Account Level key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountThroughputType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: encodedSize, - Rate: accountRates.Throughput, + RequesterID: key, + RequesterName: requesterName, + BlobSize: encodedSize, + Rate: accountRates.Throughput, Info: limiterInfo{ RateType: AccountThroughputType, QuorumID: param.QuorumID, @@ -492,9 +501,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountBlobRateType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: blobRateMultiplier, - Rate: accountRates.BlobRate, + RequesterID: key, + RequesterName: requesterName, + BlobSize: blobRateMultiplier, + Rate: accountRates.BlobRate, Info: limiterInfo{ RateType: AccountBlobRateType, QuorumID: param.QuorumID, diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index d86beae396..b4c90c93de 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -20,6 +20,7 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser" @@ -641,7 +642,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { if err != nil { panic("failed to create bucket store") } - ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger) + ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger) rateConfig := apiserver.RateConfig{ QuorumRateInfos: map[core.QuorumID]apiserver.QuorumRateInfo{ @@ -662,20 +663,24 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { Allowlist: apiserver.Allowlist{ "1.2.3.4": map[uint8]apiserver.PerUserRateInfo{ 0: { + Name: "eigenlabs", Throughput: 100 * 1024, BlobRate: 5 * 1e6, }, 1: { + Name: "eigenlabs", Throughput: 1024 * 1024, BlobRate: 5 * 1e6, }, }, "0x1aa8226f6d354380dDE75eE6B634875c4203e522": map[uint8]apiserver.PerUserRateInfo{ 0: { + Name: "eigenlabs", Throughput: 100 * 1024, BlobRate: 5 * 1e6, }, 1: { + Name: "eigenlabs", Throughput: 1024 * 1024, BlobRate: 5 * 1e6, }, @@ -693,7 +698,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { return apiserver.NewDispersalServer(disperser.ServerConfig{ GrpcPort: "51001", GrpcTimeout: 1 * time.Second, - }, queue, transactor, logger, disperser.NewMetrics("9001", logger), ratelimiter, rateConfig) + }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig) } func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) { diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index f8a8ac037f..e6fa92df06 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -10,6 +10,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/disperser/apiserver" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" + "github.com/prometheus/client_golang/prometheus" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigenda/common/aws/s3" @@ -91,6 +92,8 @@ func RunDisperserServer(ctx *cli.Context) error { blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) + reg := prometheus.NewRegistry() + var ratelimiter common.RateLimiter if config.EnableRatelimiter { globalParams := config.RatelimiterConfig.GlobalRateParams @@ -108,12 +111,19 @@ func RunDisperserServer(ctx *cli.Context) error { return err } } - ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, logger) + ratelimiter = ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger) } - // TODO: create a separate metrics for batcher - metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger) - server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, transactor, logger, metrics, ratelimiter, config.RateConfig) + metrics := disperser.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger) + server := apiserver.NewDispersalServer( + config.ServerConfig, + blobStore, + transactor, + logger, + metrics, + ratelimiter, + config.RateConfig, + ) // Enable Metrics Block if config.MetricsConfig.EnableMetrics { diff --git a/disperser/metrics.go b/disperser/metrics.go index f9ed4950b1..6a762344d9 100644 --- a/disperser/metrics.go +++ b/disperser/metrics.go @@ -37,9 +37,8 @@ const ( AccountRateLimitedFailure string = "ratelimited-account" // The request rate limited at account level ) -func NewMetrics(httpPort string, logger logging.Logger) *Metrics { +func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics { namespace := "eigenda_disperser" - reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) diff --git a/node/cmd/main.go b/node/cmd/main.go index 1fe3a9564c..00b959213d 100644 --- a/node/cmd/main.go +++ b/node/cmd/main.go @@ -8,12 +8,13 @@ import ( "time" "github.com/Layr-Labs/eigenda/common/pubip" + "github.com/Layr-Labs/eigenda/common/ratelimit" + "github.com/Layr-Labs/eigenda/common/store" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/common/ratelimit" - "github.com/Layr-Labs/eigenda/common/store" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigenda/node/flags" "github.com/Layr-Labs/eigenda/node/grpc" @@ -56,18 +57,8 @@ func NodeMain(ctx *cli.Context) error { pubIPProvider := pubip.ProviderOrDefault(config.PubIPProvider) - // Create the node. - node, err := node.NewNode(config, pubIPProvider, logger) - if err != nil { - return err - } - - err = node.Start(context.Background()) - if err != nil { - node.Logger.Error("could not start node", "error", err) - return err - } - + // Rate limiter + reg := prometheus.NewRegistry() globalParams := common.GlobalRateParams{ BucketSizes: []time.Duration{bucketDuration}, Multipliers: []float32{bucketMultiplier}, @@ -79,7 +70,19 @@ func NodeMain(ctx *cli.Context) error { return err } - ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger) + ratelimiter := ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger) + + // Create the node. + node, err := node.NewNode(reg, config, pubIPProvider, logger) + if err != nil { + return err + } + + err = node.Start(context.Background()) + if err != nil { + node.Logger.Error("could not start node", "error", err) + return err + } // Creates the GRPC server. server := grpc.NewServer(config, node, logger, ratelimiter) diff --git a/node/node.go b/node/node.go index a39f6dcc8b..1f93085162 100644 --- a/node/node.go +++ b/node/node.go @@ -66,16 +66,15 @@ type Node struct { } // NewNode creates a new Node with the provided config. -func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) { +func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) { // Setup metrics // sdkClients, err := buildSdkClients(config, logger) // if err != nil { // return nil, err // } - promReg := prometheus.NewRegistry() - eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger.With("component", "EigenMetrics")) - rpcCallsCollector := rpccalls.NewCollector(AppName, promReg) + eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, reg, logger.With("component", "EigenMetrics")) + rpcCallsCollector := rpccalls.NewCollector(AppName, reg) // Generate BLS keys keyPair, err := core.MakeKeyPairFromString(config.PrivateBls) @@ -113,7 +112,7 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger // Setup Node Api nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi")) - metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst) + metrics := NewMetrics(eigenMetrics, reg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst) // Make validator v, err := verifier.NewVerifier(&config.EncoderConfig, false) diff --git a/test/integration_test.go b/test/integration_test.go index fc5c8c4661..c75afeee0b 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -168,7 +168,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser } finalizer := batchermock.NewFinalizer() - disperserMetrics := disperser.NewMetrics("9100", logger) + disperserMetrics := disperser.NewMetrics(prometheus.NewRegistry(), "9100", logger) txnManager := batchermock.NewTxnManager() batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics, handleBatchLivenessChan)