Skip to content

Commit

Permalink
Add blob ratelimit (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph authored Dec 20, 2023
1 parent 6593001 commit 837398e
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 20 deletions.
2 changes: 1 addition & 1 deletion disperser/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ run_encoder: build_encoder
--kzg.srs-order 3000 \
--kzg.num-workers 12 \
--disperser-encoder.log.level-std trace \
--disperser-encoder.log.level-file trace
--disperser-encoder.log.level-file trace
60 changes: 54 additions & 6 deletions disperser/apiserver/rate_config.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
package apiserver

import (
"fmt"
"strconv"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/urfave/cli"
)

const (
RegisteredQuorumFlagName = "auth.registered-quorum"
TotalUnauthThroughputFlagName = "auth.total-unauth-throughput"
PerUserUnauthThroughputFlagName = "auth.per-user-unauth-throughput"
TotalUnauthThroughputFlagName = "auth.total-unauth-byte-rate"
PerUserUnauthThroughputFlagName = "auth.per-user-unauth-byte-rate"
TotalUnauthBlobRateFlagName = "auth.total-unauth-blob-rate"
PerUserUnauthBlobRateFlagName = "auth.per-user-unauth-blob-rate"
ClientIPHeaderFlagName = "auth.client-ip-header"

// We allow the user to specify the blob rate in blobs/sec, but internally we use blobs/sec * 1e6 (i.e. blobs/microsec).
// This is because the rate limiter takes an integer rate.
blobRateMultiplier = 1e6
)

type QuorumRateInfo struct {
PerUserUnauthThroughput common.RateParam
TotalUnauthThroughput common.RateParam
PerUserUnauthBlobRate common.RateParam
TotalUnauthBlobRate common.RateParam
}

type RateConfig struct {
Expand All @@ -35,13 +46,25 @@ func CLIFlags(envPrefix string) []cli.Flag {
Name: TotalUnauthThroughputFlagName,
Usage: "Total encoded throughput for unauthenticated requests (Bytes/sec)",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "TOTAL_UNAUTH_THROUGHPUT"),
EnvVar: common.PrefixEnvVar(envPrefix, "TOTAL_UNAUTH_BYTE_RATE"),
},
cli.IntSliceFlag{
Name: PerUserUnauthThroughputFlagName,
Usage: "Per-user encoded throughput for unauthenticated requests (Bytes/sec)",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "PER_USER_UNAUTH_THROUGHPUT"),
EnvVar: common.PrefixEnvVar(envPrefix, "PER_USER_UNAUTH_BYTE_RATE"),
},
cli.StringSliceFlag{
Name: TotalUnauthBlobRateFlagName,
Usage: "Total blob rate for unauthenticated requests (Blobs/sec)",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "TOTAL_UNAUTH_BLOB_RATE"),
},
cli.StringSliceFlag{
Name: PerUserUnauthBlobRateFlagName,
Usage: "Per-user blob interval for unauthenticated requests (Blobs/sec)",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "PER_USER_UNAUTH_BLOB_RATE"),
},
cli.StringFlag{
Name: ClientIPHeaderFlagName,
Expand All @@ -53,19 +76,44 @@ func CLIFlags(envPrefix string) []cli.Flag {
}
}

func ReadCLIConfig(c *cli.Context) RateConfig {
func ReadCLIConfig(c *cli.Context) (RateConfig, error) {

numQuorums := len(c.IntSlice(RegisteredQuorumFlagName))
if len(c.StringSlice(TotalUnauthBlobRateFlagName)) != numQuorums {
return RateConfig{}, fmt.Errorf("number of total unauth blob rates does not match number of quorums")
}
if len(c.StringSlice(PerUserUnauthBlobRateFlagName)) != numQuorums {
return RateConfig{}, fmt.Errorf("number of per user unauth blob intervals does not match number of quorums")
}
if len(c.IntSlice(TotalUnauthThroughputFlagName)) != numQuorums {
return RateConfig{}, fmt.Errorf("number of total unauth throughput does not match number of quorums")
}
if len(c.IntSlice(PerUserUnauthThroughputFlagName)) != numQuorums {
return RateConfig{}, fmt.Errorf("number of per user unauth throughput does not match number of quorums")
}

quorumRateInfos := make(map[core.QuorumID]QuorumRateInfo)
for ind, quorumID := range c.IntSlice(RegisteredQuorumFlagName) {

totalBlobRate, err := strconv.ParseFloat(c.StringSlice(TotalUnauthBlobRateFlagName)[ind], 64)
if err != nil {
return RateConfig{}, err
}
accountBlobRate, err := strconv.ParseFloat(c.StringSlice(PerUserUnauthBlobRateFlagName)[ind], 64)
if err != nil {
return RateConfig{}, err
}

quorumRateInfos[core.QuorumID(quorumID)] = QuorumRateInfo{
TotalUnauthThroughput: common.RateParam(c.IntSlice(TotalUnauthThroughputFlagName)[ind]),
PerUserUnauthThroughput: common.RateParam(c.IntSlice(PerUserUnauthThroughputFlagName)[ind]),
TotalUnauthBlobRate: common.RateParam(totalBlobRate * blobRateMultiplier),
PerUserUnauthBlobRate: common.RateParam(accountBlobRate * blobRateMultiplier),
}
}

return RateConfig{
QuorumRateInfos: quorumRateInfos,
ClientIPHeader: c.String(ClientIPHeaderFlagName),
}
}, nil
}
26 changes: 24 additions & 2 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,22 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
return fmt.Errorf("ratelimiter error: %v", err)
}
if !allowed {
s.logger.Warn("system ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", rates.TotalUnauthThroughput)
s.logger.Warn("system byte ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", rates.TotalUnauthThroughput)
return errSystemRateLimit
}

systemQuorumKey = fmt.Sprintf("%s:%d-blobrate", systemAccountKey, param.QuorumID)
allowed, err = s.ratelimiter.AllowRequest(ctx, systemQuorumKey, blobRateMultiplier, rates.TotalUnauthBlobRate)
if err != nil {
return fmt.Errorf("ratelimiter error: %v", err)
}
if !allowed {
s.logger.Warn("system blob ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", float32(rates.TotalUnauthBlobRate)/blobRateMultiplier)
return errSystemRateLimit
}

// Check Account Ratelimit

blob.RequestHeader.AccountID = "ip:" + origin

userQuorumKey := fmt.Sprintf("%s:%d", blob.RequestHeader.AccountID, param.QuorumID)
Expand All @@ -213,7 +225,17 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
return fmt.Errorf("ratelimiter error: %v", err)
}
if !allowed {
s.logger.Warn("account ratelimit exceeded", "userQuorumKey", userQuorumKey, "rate", rates.PerUserUnauthThroughput)
s.logger.Warn("account byte ratelimit exceeded", "userQuorumKey", userQuorumKey, "rate", rates.PerUserUnauthThroughput)
return errAccountRateLimit
}

userQuorumKey = fmt.Sprintf("%s:%d-blobrate", blob.RequestHeader.AccountID, param.QuorumID)
allowed, err = s.ratelimiter.AllowRequest(ctx, userQuorumKey, blobRateMultiplier, rates.PerUserUnauthBlobRate)
if err != nil {
return fmt.Errorf("ratelimiter error: %v", err)
}
if !allowed {
s.logger.Warn("account blob ratelimit exceeded", "userQuorumKey", userQuorumKey, "rate", float32(rates.PerUserUnauthBlobRate)/blobRateMultiplier)
return errAccountRateLimit
}

Expand Down
7 changes: 6 additions & 1 deletion disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func NewConfig(ctx *cli.Context) (Config, error) {
return Config{}, err
}

rateConfig, err := apiserver.ReadCLIConfig(ctx)
if err != nil {
return Config{}, err
}

config := Config{
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
ServerConfig: disperser.ServerConfig{
Expand All @@ -51,7 +56,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name),
},
RatelimiterConfig: ratelimiterConfig,
RateConfig: apiserver.ReadCLIConfig(ctx),
RateConfig: rateConfig,
EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name),
BucketTableName: ctx.GlobalString(flags.BucketTableName.Name),
BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name),
Expand Down
10 changes: 6 additions & 4 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,12 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath,
DISPERSER_SERVER_PRIVATE_KEY: "123",
DISPERSER_SERVER_NUM_CONFIRMATIONS: "0",

DISPERSER_SERVER_REGISTERED_QUORUM_ID: "0",
DISPERSER_SERVER_TOTAL_UNAUTH_THROUGHPUT: "10000000",
DISPERSER_SERVER_PER_USER_UNAUTH_THROUGHPUT: "32000",
DISPERSER_SERVER_ENABLE_RATELIMITER: "true",
DISPERSER_SERVER_REGISTERED_QUORUM_ID: "0",
DISPERSER_SERVER_TOTAL_UNAUTH_BYTE_RATE: "10000000",
DISPERSER_SERVER_PER_USER_UNAUTH_BYTE_RATE: "32000",
DISPERSER_SERVER_TOTAL_UNAUTH_BLOB_RATE: "10",
DISPERSER_SERVER_PER_USER_UNAUTH_BLOB_RATE: "2",
DISPERSER_SERVER_ENABLE_RATELIMITER: "true",

DISPERSER_SERVER_BUCKET_SIZES: "5s",
DISPERSER_SERVER_BUCKET_MULTIPLIERS: "1",
Expand Down
14 changes: 12 additions & 2 deletions inabox/deploy/env_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type DisperserVars struct {

DISPERSER_SERVER_BUCKET_STORE_SIZE string

DISPERSER_SERVER_ALLOWLIST string

DISPERSER_SERVER_AWS_REGION string

DISPERSER_SERVER_AWS_ACCESS_KEY_ID string
Expand All @@ -53,9 +55,13 @@ type DisperserVars struct {

DISPERSER_SERVER_REGISTERED_QUORUM_ID string

DISPERSER_SERVER_TOTAL_UNAUTH_THROUGHPUT string
DISPERSER_SERVER_TOTAL_UNAUTH_BYTE_RATE string

DISPERSER_SERVER_PER_USER_UNAUTH_BYTE_RATE string

DISPERSER_SERVER_TOTAL_UNAUTH_BLOB_RATE string

DISPERSER_SERVER_PER_USER_UNAUTH_THROUGHPUT string
DISPERSER_SERVER_PER_USER_UNAUTH_BLOB_RATE string

DISPERSER_SERVER_CLIENT_IP_HEADER string
}
Expand Down Expand Up @@ -303,6 +309,10 @@ type RetrieverVars struct {

RETRIEVER_METRICS_HTTP_PORT string

RETRIEVER_GRAPH_URL string

RETRIEVER_USE_GRAPH string

RETRIEVER_G1_PATH string

RETRIEVER_G2_PATH string
Expand Down
2 changes: 1 addition & 1 deletion inabox/tests/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func setupRetrievalClient(testConfig *deploy.Config) error {
NumWorker: 1,
SRSOrder: uint64(srsOrder),
Verbose: true,
PreloadEncoder: true,
PreloadEncoder: false,
},
})
if err != nil {
Expand Down
33 changes: 30 additions & 3 deletions inabox/tests/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type result struct {
err error
}

func dispserse(t *testing.T, ctx context.Context, client traffic.DisperserClient, resultChan chan result, data []byte, param core.SecurityParam) {
func disperse(t *testing.T, ctx context.Context, client traffic.DisperserClient, resultChan chan result, data []byte, param core.SecurityParam) {

blobStatus, key, err := client.DisperseBlob(ctx, data, param.QuorumID, param.QuorumThreshold, param.AdversaryThreshold)
if err != nil {
Expand Down Expand Up @@ -127,7 +127,7 @@ func testRatelimit(t *testing.T, testConfig *deploy.Config, c ratelimitTestCase)
go func() {
for i := 0; i < c.numDispersal; i++ {
<-dispersalTicker.C
go dispserse(t, ctx, disp, resultChan, data, c.param)
go disperse(t, ctx, disp, resultChan, data, c.param)
}
}()

Expand Down Expand Up @@ -181,7 +181,7 @@ func TestRatelimit(t *testing.T) {
}
testConfig := deploy.NewTestConfig(testname, rootPath)

if testConfig.Dispersers[0].DISPERSER_SERVER_PER_USER_UNAUTH_THROUGHPUT != fmt.Sprint(perUserThroughput) {
if testConfig.Dispersers[0].DISPERSER_SERVER_PER_USER_UNAUTH_BYTE_RATE != fmt.Sprint(perUserThroughput) {
t.Fatalf("per user throughput should be %v", perUserThroughput)
}
if testConfig.Dispersers[0].DISPERSER_SERVER_BUCKET_MULTIPLIERS != fmt.Sprint(dispersalMultiplier) {
Expand Down Expand Up @@ -281,4 +281,31 @@ func TestRatelimit(t *testing.T) {

})

t.Run("ratelimiting when dispersing greater than blob rate", func(t *testing.T) {

t.Skip("Manual test for now")

testCase := ratelimitTestCase{
numDispersal: 200,
numRetrieval: 0,
dispersalInterval: 450 * time.Millisecond,
retrievalInterval: 500 * time.Millisecond,
pause: 0,
blobSize: 5,
param: core.SecurityParam{
QuorumID: 0,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
}

dispersalErrors, retrievalErrors := testRatelimit(t, testConfig, testCase)

fmt.Println("Dispersal Ratelimited: ", dispersalErrors)

assert.Greater(t, dispersalErrors, 0)
assert.Equal(t, 0, retrievalErrors)

})

}

0 comments on commit 837398e

Please sign in to comment.