Skip to content

Commit

Permalink
validate quorum ids
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 13, 2023
1 parent 0e8fd8d commit 1652fa8
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 46 deletions.
45 changes: 30 additions & 15 deletions churner/churner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/ecdsa"
"errors"
"math/big"
"sync"
"time"

"github.com/Layr-Labs/eigenda/common"
Expand Down Expand Up @@ -42,11 +43,13 @@ type ChurnResponse struct {
}

type churner struct {
Indexer thegraph.IndexedChainState
Transactor core.Transactor
StakeRegistryAddress gethcommon.Address
privateKey *ecdsa.PrivateKey
logger common.Logger
mu sync.Mutex
Indexer thegraph.IndexedChainState
Transactor core.Transactor
QuorumCount uint16

privateKey *ecdsa.PrivateKey
logger common.Logger
}

func NewChurner(
Expand All @@ -55,22 +58,18 @@ func NewChurner(
transactor core.Transactor,
logger common.Logger,
) (*churner, error) {
stakeRegistryAddress, err := transactor.StakeRegistry(context.Background())
if err != nil {
return nil, err
}

privateKey, err := crypto.HexToECDSA(config.EthClientConfig.PrivateKeyString)
if err != nil {
return nil, err
}

return &churner{
Indexer: indexer,
Transactor: transactor,
StakeRegistryAddress: stakeRegistryAddress,
privateKey: privateKey,
logger: logger,
Indexer: indexer,
Transactor: transactor,
QuorumCount: 0,

privateKey: privateKey,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -121,6 +120,22 @@ func (c *churner) ProcessChurnRequest(ctx context.Context, operatorToRegisterAdd
return c.createChurnResponse(ctx, operatorToRegisterId, operatorToRegisterAddress, churnRequest.QuorumIDs)
}

func (c *churner) UpdateQuorumCount(ctx context.Context) error {
currentBlock, err := c.Transactor.GetCurrentBlockNumber(ctx)
if err != nil {
return err
}
count, err := c.Transactor.GetQuorumCount(ctx, currentBlock)
if err != nil {
return err
}

c.mu.Lock()
c.QuorumCount = count
c.mu.Unlock()
return nil
}

func (c *churner) createChurnResponse(
ctx context.Context,
operatorToRegisterId core.OperatorID,
Expand Down
32 changes: 19 additions & 13 deletions churner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ import (
"github.com/Layr-Labs/eigenda/core"
)

var (
// the signature with the lastest expiry
latestExpiry = int64(0)
)

type Server struct {
pb.UnimplementedChurnerServer

config *Config
churner *churner
config *Config
churner *churner
// the signature with the lastest expiry
latestExpiry int64
lastRequestTimeByOperatorID map[core.OperatorID]time.Time
logger common.Logger

logger common.Logger
}

func NewServer(
Expand All @@ -32,6 +30,7 @@ func NewServer(
return &Server{
config: config,
churner: churner,
latestExpiry: int64(0),
lastRequestTimeByOperatorID: make(map[core.OperatorID]time.Time),
logger: logger,
}
Expand All @@ -47,13 +46,20 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl

now := time.Now()
// check that we are after the previous approval's expiry
if now.Unix() < latestExpiry {
return nil, fmt.Errorf("previous approval not expired, retry in %d", latestExpiry-now.Unix())
if now.Unix() < s.latestExpiry {
return nil, fmt.Errorf("previous approval not expired, retry in %d", s.latestExpiry-now.Unix())
}

for quorumID := range req.GetQuorumIds() {
if quorumID > 255 {
return nil, fmt.Errorf("Invalid request: quorum ID must be in range [0, 255], but found %d", quorumID)
if quorumID >= int(s.churner.QuorumCount) {
err := s.churner.UpdateQuorumCount(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get onchain quorum count: %w", err)
}

if quorumID >= int(s.churner.QuorumCount) {
return nil, fmt.Errorf("Invalid request: the quorum_id must be in range [0, %d], but found %d", s.churner.QuorumCount-1, quorumID)
}
}
}

Expand All @@ -76,7 +82,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl
}

// update the latest expiry
latestExpiry = response.SignatureWithSaltAndExpiry.Expiry.Int64()
s.latestExpiry = response.SignatureWithSaltAndExpiry.Expiry.Int64()

operatorsToChurn := convertToOperatorsToChurnGrpc(response.OperatorsToChurn)

Expand Down
37 changes: 37 additions & 0 deletions churner/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,50 @@ func TestChurn(t *testing.T) {
assert.Equal(t, operatorAddr.Bytes(), param.GetOperator())
assert.Equal(t, keyPair.PubKey.Serialize(), param.GetPubkey())
}

// retry prior to expiry should fail
_, err = s.Churn(ctx, request)
assert.ErrorContains(t, err, "previous approval not expired, retry in")
}

func TestChurnWithInvalidQuorum(t *testing.T) {
s := newTestServer(t)
ctx := context.Background()

salt := crypto.Keccak256([]byte(operatorToChurnInPrivateKeyHex), []byte("ChurnRequest"))
request := &pb.ChurnRequest{
OperatorToRegisterPubkeyG1: keyPair.PubKey.Serialize(),
OperatorToRegisterPubkeyG2: keyPair.GetPubKeyG2().Serialize(),
Salt: salt,
QuorumIds: []uint32{0, 1},
}

var requestHash [32]byte
requestHashBytes := crypto.Keccak256(
[]byte("ChurnRequest"),
request.OperatorToRegisterPubkeyG1,
request.OperatorToRegisterPubkeyG2,
request.Salt,
)
copy(requestHash[:], requestHashBytes)

signature := keyPair.SignMessage(requestHash)
request.OperatorRequestSignature = signature.Serialize()

mockIndexer.On("GetIndexedOperatorInfoByOperatorId").Return(&core.IndexedOperatorInfo{
PubkeyG1: keyPair.PubKey,
}, nil)

_, err := s.Churn(ctx, request)
assert.ErrorContains(t, err, "Invalid request: the quorum_id must be in range [0, 0], but found 1")
}

func setupMockTransactor() {
transactorMock.On("StakeRegistry").Return(gethcommon.HexToAddress("0x0000000000000000000000000000000000000001"), nil).Once()
transactorMock.On("OperatorIDToAddress").Return(operatorAddr, nil)
transactorMock.On("GetCurrentQuorumBitmapByOperatorId").Return(big.NewInt(2), nil)
transactorMock.On("GetCurrentBlockNumber").Return(uint32(2), nil)
transactorMock.On("GetQuorumCount").Return(uint16(1), nil)
transactorMock.On("GetOperatorStakesForQuorums").Return([][]dacore.OperatorStake{
{
{
Expand Down
7 changes: 7 additions & 0 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,13 @@ func (t *Transactor) GetCurrentBlockNumber(ctx context.Context) (uint32, error)
return t.EthClient.GetCurrentBlockNumber(ctx)
}

func (t *Transactor) GetQuorumCount(ctx context.Context, blockNumber uint32) (uint16, error) {
return t.Bindings.StakeRegistry.QuorumCount(&bind.CallOpts{
Context: ctx,
BlockNumber: big.NewInt(int64(blockNumber)),
})
}

func (t *Transactor) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDAServiceManagerAddr gethcommon.Address) error {
contractEigenDAServiceManager, err := eigendasrvmg.NewContractEigenDAServiceManager(eigenDAServiceManagerAddr, t.EthClient)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ func (t *MockTransactor) GetCurrentBlockNumber(ctx context.Context) (uint32, err
return result.(uint32), args.Error(1)
}

func (t *MockTransactor) GetQuorumCount(ctx context.Context, blockNumber uint32) (uint16, error) {
args := t.Called()
result := args.Get(0)
return result.(uint16), args.Error(1)
}

func (t *MockTransactor) PubkeyHashToOperator(ctx context.Context, operatorId core.OperatorID) (gethcommon.Address, error) {
args := t.Called()
result := args.Get(0)
Expand Down
23 changes: 13 additions & 10 deletions core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,30 @@ type Transactor interface {
// specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail.
ConfirmBatch(ctx context.Context, batchHeader BatchHeader, quorums map[QuorumID]*QuorumResult, signatureAggregation SignatureAggregation) (*types.Receipt, error)

// Returns the BLOCK_STALE_MEASURE defined onchain.
// GetBlockStaleMeasure returns the BLOCK_STALE_MEASURE defined onchain.
GetBlockStaleMeasure(ctx context.Context) (uint32, error)
// Returns the STORE_DURATION_BLOCKS defined onchain.
// GetStoreDurationBlocks returns the STORE_DURATION_BLOCKS defined onchain.
GetStoreDurationBlocks(ctx context.Context) (uint32, error)

// Returns the address of the stake registry contract.
// StakeRegistry returns the address of the stake registry contract.
StakeRegistry(ctx context.Context) (gethcommon.Address, error)

// Returns the address of the operator from the operator id.
// OperatorIDToAddress returns the address of the operator from the operator id.
OperatorIDToAddress(ctx context.Context, operatorId OperatorID) (gethcommon.Address, error)

// Returns the current quorum bitmap for the operator.
// GetCurrentQuorumBitmapByOperatorId returns the current quorum bitmap for the operator.
GetCurrentQuorumBitmapByOperatorId(ctx context.Context, operatorId OperatorID) (*big.Int, error)

// Returns operator set params for the quorum.
// GetOperatorSetParams returns operator set params for the quorum.
GetOperatorSetParams(ctx context.Context, quorumID QuorumID) (*OperatorSetParam, error)

// Returns the number of registered operators for the quorum.
// GetNumberOfRegisteredOperatorForQuorum returns the number of registered operators for the quorum.
GetNumberOfRegisteredOperatorForQuorum(ctx context.Context, quorumID QuorumID) (uint32, error)

// Returns the weight of the operator for the quorum view.
// WeightOfOperatorForQuorum returns the weight of the operator for the quorum view.
WeightOfOperatorForQuorum(ctx context.Context, quorumID QuorumID, operator gethcommon.Address) (*big.Int, error)

// Returns calculated operator churn approval digest hash.
// CalculateOperatorChurnApprovalDigestHash returns calculated operator churn approval digest hash.
CalculateOperatorChurnApprovalDigestHash(
ctx context.Context,
operatorId OperatorID,
Expand All @@ -96,6 +96,9 @@ type Transactor interface {
expiry *big.Int,
) ([32]byte, error)

// Returns the current block number.
// GetCurrentBlockNumber returns the current block number.
GetCurrentBlockNumber(ctx context.Context) (uint32, error)

// GetQuorumCount returns the number of quorums registered at given block number.
GetQuorumCount(ctx context.Context, blockNumber uint32) (uint16, error)
}
47 changes: 43 additions & 4 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"time"

pb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
Expand All @@ -26,10 +27,13 @@ const maxBlobSize = 1024 * 512 // 512 KiB

type DispersalServer struct {
pb.UnimplementedDisperserServer
mu sync.Mutex

config disperser.ServerConfig

blobStore disperser.BlobStore
blobStore disperser.BlobStore
tx core.Transactor
quorumCount uint16

rateConfig RateConfig
ratelimiter common.RateLimiter
Expand All @@ -45,6 +49,7 @@ type DispersalServer struct {
func NewDispersalServer(
config disperser.ServerConfig,
store disperser.BlobStore,
tx core.Transactor,
logger common.Logger,
metrics *disperser.Metrics,
ratelimiter common.RateLimiter,
Expand All @@ -53,6 +58,8 @@ func NewDispersalServer(
return &DispersalServer{
config: config,
blobStore: store,
tx: tx,
quorumCount: 0,
metrics: metrics,
logger: logger,
ratelimiter: ratelimiter,
Expand All @@ -67,11 +74,22 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob
defer timer.ObserveDuration()

securityParams := req.GetSecurityParams()
if len(securityParams) == 0 {
return nil, fmt.Errorf("invalid request: security_params must not be empty")
}

// The quorum ID must be in range [0, 255]. It'll actually be converted
// to uint8, so it cannot be greater than 255.
for _, param := range securityParams {
if param.GetQuorumId() > 255 {
return nil, fmt.Errorf("invalid request: the quorum_id must be in range [0, 255], but found %d", param.GetQuorumId())
if param.GetQuorumId() >= uint32(s.quorumCount) {
err := s.updateQuorumCount(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get onchain quorum count: %w", err)
}

if param.GetQuorumId() >= uint32(s.quorumCount) {
return nil, fmt.Errorf("Invalid request: the quorum_id must be in range [0, %d], but found %d", s.quorumCount-1, param.GetQuorumId())
}
}
}

Expand Down Expand Up @@ -198,7 +216,11 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR
defer timer.ObserveDuration()

requestID := req.GetRequestId()
s.logger.Info("received a new blob status request", "requestID", requestID)
if len(requestID) == 0 {
return nil, fmt.Errorf("invalid request: request_id must not be empty")
}

s.logger.Info("received a new blob status request", "requestID", string(requestID))
metadataKey, err := disperser.ParseBlobKey(string(requestID))
if err != nil {
return nil, err
Expand Down Expand Up @@ -344,6 +366,23 @@ func (s *DispersalServer) Start(ctx context.Context) error {
return nil
}

func (s *DispersalServer) updateQuorumCount(ctx context.Context) error {
currentBlock, err := s.tx.GetCurrentBlockNumber(ctx)
if err != nil {
return err
}
count, err := s.tx.GetQuorumCount(ctx, currentBlock)
if err != nil {
return err
}

s.logger.Debug("updating quorum count", "currentBlock", currentBlock, "count", count)
s.mu.Lock()
s.quorumCount = count
s.mu.Unlock()
return nil
}

func getResponseStatus(status disperser.BlobStatus) pb.BlobStatus {
switch status {
case disperser.Processing:
Expand Down
Loading

0 comments on commit 1652fa8

Please sign in to comment.