Skip to content

Commit

Permalink
Periodically reload allowlist from file (#510)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored May 9, 2024
1 parent 83ef8bd commit a4ae98e
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 130 deletions.
168 changes: 69 additions & 99 deletions disperser/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,27 @@ package apiserver
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"strconv"
"strings"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/urfave/cli"
)

const (
RegisteredQuorumFlagName = "auth.registered-quorum"
TotalUnauthThroughputFlagName = "auth.total-unauth-byte-rate"
PerUserUnauthThroughputFlagName = "auth.per-user-unauth-byte-rate"
TotalUnauthBlobRateFlagName = "auth.total-unauth-blob-rate"
PerUserUnauthBlobRateFlagName = "auth.per-user-unauth-blob-rate"
ClientIPHeaderFlagName = "auth.client-ip-header"
AllowlistFlagName = "auth.allowlist"
AllowlistFileFlagName = "auth.allowlist-file"
RegisteredQuorumFlagName = "auth.registered-quorum"
TotalUnauthThroughputFlagName = "auth.total-unauth-byte-rate"
PerUserUnauthThroughputFlagName = "auth.per-user-unauth-byte-rate"
TotalUnauthBlobRateFlagName = "auth.total-unauth-blob-rate"
PerUserUnauthBlobRateFlagName = "auth.per-user-unauth-blob-rate"
ClientIPHeaderFlagName = "auth.client-ip-header"
AllowlistFileFlagName = "auth.allowlist-file"
AllowlistRefreshIntervalFlagName = "auth.allowlist-refresh-interval"

RetrievalBlobRateFlagName = "auth.retrieval-blob-rate"
RetrievalThroughputFlagName = "auth.retrieval-throughput"
Expand Down Expand Up @@ -62,18 +63,11 @@ type RateConfig struct {

RetrievalBlobRate common.RateParam
RetrievalThroughput common.RateParam
}

// Deprecated: use AllowlistFileFlagName instead
func AllowlistFlag(envPrefix string) cli.Flag {
return cli.StringSliceFlag{
Name: AllowlistFlagName,
Usage: "Allowlist of IPs or ethereum addresses (including initial \"0x\") and corresponding blob/byte rates to bypass rate limiting. Format: [<IP>||<ETH ADDRESS>]/<quorum ID>/<blob rate>/<byte rate>. Example: 127.0.0.1/0/10/10485760",
EnvVar: common.PrefixEnvVar(envPrefix, "ALLOWLIST"),
Required: false,
Value: &cli.StringSlice{},
}
AllowlistFile string
AllowlistRefreshInterval time.Duration
}

func AllowlistFileFlag(envPrefix string) cli.Flag {
return cli.StringFlag{
Name: AllowlistFileFlagName,
Expand Down Expand Up @@ -122,8 +116,14 @@ func CLIFlags(envPrefix string) []cli.Flag {
Value: "",
EnvVar: common.PrefixEnvVar(envPrefix, "CLIENT_IP_HEADER"),
},
AllowlistFlag(envPrefix),
AllowlistFileFlag(envPrefix),
cli.DurationFlag{
Name: AllowlistRefreshIntervalFlagName,
Usage: "The interval at which to refresh the allowlist from the file",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "ALLOWLIST_REFRESH_INTERVAL"),
Value: 5 * time.Minute,
},
cli.IntFlag{
Name: RetrievalBlobRateFlagName,
Usage: "The blob rate limit for retrieval requests (Blobs/sec)",
Expand All @@ -139,90 +139,50 @@ func CLIFlags(envPrefix string) []cli.Flag {
}
}

func parseAllowlistEntry(c *cli.Context) Allowlist {
// Parse from AllowlistFlagName
// Remove when AllowlistFlagName is deprecated and no longer used
func ReadAllowlistFromFile(f string) (Allowlist, error) {
allowlist := make(Allowlist)
for _, allowlistEntry := range c.StringSlice(AllowlistFlagName) {
allowlistEntrySplit := strings.Split(allowlistEntry, "/")
if len(allowlistEntrySplit) != 4 {
log.Printf("invalid allowlist entry: entry should contain exactly 4 elements: %s", allowlistEntry)
continue
}
ip := allowlistEntrySplit[0]
quorumID, err := strconv.Atoi(allowlistEntrySplit[1])
if err != nil {
log.Printf("invalid allowlist entry: failed to convert quorum ID from string: %s", allowlistEntry)
continue
}
blobRate, err := strconv.ParseFloat(allowlistEntrySplit[2], 64)
if err != nil {
log.Printf("invalid allowlist entry: failed to convert blob rate from string: %s", allowlistEntry)
continue
}
byteRate, err := strconv.ParseFloat(allowlistEntrySplit[3], 64)
if err != nil {
log.Printf("invalid allowlist entry: failed to convert throughput from string: %s", allowlistEntry)
continue
}
rateInfoByQuorum, ok := allowlist[ip]
if !ok {
allowlist[ip] = map[core.QuorumID]PerUserRateInfo{
core.QuorumID(quorumID): {
Throughput: common.RateParam(byteRate),
BlobRate: common.RateParam(blobRate * blobRateMultiplier),
},
}
} else {
rateInfoByQuorum[core.QuorumID(quorumID)] = PerUserRateInfo{
Throughput: common.RateParam(byteRate),
BlobRate: common.RateParam(blobRate * blobRateMultiplier),
}
}
if f == "" {
return allowlist, nil
}

// Parse from AllowlistFileFlagName
allowlistFileName := c.String(AllowlistFileFlagName)
if allowlistFileName != "" {
allowlistFile, err := os.Open(allowlistFileName)
if err != nil {
log.Printf("failed to read allowlist file: %s", err)
return allowlist
}
defer allowlistFile.Close()
var allowlistEntries []AllowlistEntry
content, err := io.ReadAll(allowlistFile)
if err != nil {
log.Printf("failed to load allowlist file content: %s", err)
return allowlist
}
err = json.Unmarshal(content, &allowlistEntries)
if err != nil {
log.Printf("failed to parse allowlist file content: %s", err)
return allowlist
}
allowlistFile, err := os.Open(f)
if err != nil {
log.Printf("failed to read allowlist file: %s", err)
return allowlist, err
}
defer allowlistFile.Close()
var allowlistEntries []AllowlistEntry
content, err := io.ReadAll(allowlistFile)
if err != nil {
log.Printf("failed to load allowlist file content: %s", err)
return allowlist, err
}
err = json.Unmarshal(content, &allowlistEntries)
if err != nil {
log.Printf("failed to parse allowlist file content: %s", err)
return allowlist, err
}

for _, entry := range allowlistEntries {
rateInfoByQuorum, ok := allowlist[entry.Account]
if !ok {
allowlist[entry.Account] = map[core.QuorumID]PerUserRateInfo{
core.QuorumID(entry.QuorumID): {
Name: entry.Name,
Throughput: common.RateParam(entry.ByteRate),
BlobRate: common.RateParam(entry.BlobRate * blobRateMultiplier),
},
}
} else {
rateInfoByQuorum[core.QuorumID(entry.QuorumID)] = PerUserRateInfo{
for _, entry := range allowlistEntries {
rateInfoByQuorum, ok := allowlist[entry.Account]
if !ok {
allowlist[entry.Account] = map[core.QuorumID]PerUserRateInfo{
core.QuorumID(entry.QuorumID): {
Name: entry.Name,
Throughput: common.RateParam(entry.ByteRate),
BlobRate: common.RateParam(entry.BlobRate * blobRateMultiplier),
}
},
}
} else {
rateInfoByQuorum[core.QuorumID(entry.QuorumID)] = PerUserRateInfo{
Name: entry.Name,
Throughput: common.RateParam(entry.ByteRate),
BlobRate: common.RateParam(entry.BlobRate * blobRateMultiplier),
}
}
}

return allowlist
return allowlist, nil
}

func ReadCLIConfig(c *cli.Context) (RateConfig, error) {
Expand Down Expand Up @@ -261,13 +221,23 @@ func ReadCLIConfig(c *cli.Context) (RateConfig, error) {
}
}

allowlist := parseAllowlistEntry(c)
allowlist := make(Allowlist)
allowlistFileName := c.String(AllowlistFileFlagName)
if allowlistFileName != "" {
var err error
allowlist, err = ReadAllowlistFromFile(allowlistFileName)
if err != nil {
return RateConfig{}, fmt.Errorf("failed to read allowlist file %s: %w", allowlistFileName, err)
}
}

return RateConfig{
QuorumRateInfos: quorumRateInfos,
ClientIPHeader: c.String(ClientIPHeaderFlagName),
Allowlist: allowlist,
RetrievalBlobRate: common.RateParam(c.Int(RetrievalBlobRateFlagName) * blobRateMultiplier),
RetrievalThroughput: common.RateParam(c.Int(RetrievalThroughputFlagName)),
QuorumRateInfos: quorumRateInfos,
ClientIPHeader: c.String(ClientIPHeaderFlagName),
Allowlist: allowlist,
RetrievalBlobRate: common.RateParam(c.Int(RetrievalBlobRateFlagName) * blobRateMultiplier),
RetrievalThroughput: common.RateParam(c.Int(RetrievalThroughputFlagName)),
AllowlistFile: c.String(AllowlistFileFlagName),
AllowlistRefreshInterval: c.Duration(AllowlistRefreshIntervalFlagName),
}, nil
}
26 changes: 26 additions & 0 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func NewDispersalServer(
logger.Info("[Allowlist]", "account", account, "name", rateInfo.Name, "quorumID", quorumID, "throughput", rateInfo.Throughput, "blobRate", rateInfo.BlobRate)
}
}
logger.Info("allowlist config", "file", rateConfig.AllowlistFile, "refreshInterval", rateConfig.AllowlistRefreshInterval.String())

authenticator := auth.NewAuthenticator(auth.AuthConfig{})

Expand Down Expand Up @@ -752,7 +753,23 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
}, nil
}

func (s *DispersalServer) GetRateConfig() *RateConfig {
return &s.rateConfig
}

func (s *DispersalServer) Start(ctx context.Context) error {
go func() {
t := time.NewTicker(s.rateConfig.AllowlistRefreshInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
s.LoadAllowlist()
}
}
}()
// Serve grpc requests
addr := fmt.Sprintf("%s:%s", disperser.Localhost, s.serverConfig.GrpcPort)
listener, err := net.Listen("tcp", addr)
Expand All @@ -778,6 +795,15 @@ func (s *DispersalServer) Start(ctx context.Context) error {
return nil
}

func (s *DispersalServer) LoadAllowlist() {
al, err := ReadAllowlistFromFile(s.rateConfig.AllowlistFile)
if err != nil {
s.logger.Error("failed to load allowlist", "err", err)
return
}
s.rateConfig.Allowlist = al
}

// updateQuorumConfig updates the quorum config and returns the updated quorum config. If the update fails,
// it will fallback to the old quorumConfig if it is set. This is to improve the robustness of the disperser to
// RPC failures since the quorum config is rarely updated. In the event that quorumConfig is incorrect, this will
Expand Down
Loading

0 comments on commit a4ae98e

Please sign in to comment.