Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[encodingstreamer] Validate quorums #109

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions churner/churner.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (c *churner) createChurnResponse(
}, nil
}

func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, operatorStakes [][]core.OperatorStake, operatorToRegisterAddress gethcommon.Address, currentBlockNumber uint32) ([]core.OperatorToChurn, error) {
func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, operatorStakes core.OperatorStakes, operatorToRegisterAddress gethcommon.Address, currentBlockNumber uint32) ([]core.OperatorToChurn, error) {
operatorsToChurn := make([]core.OperatorToChurn, 0)
for i, quorumID := range quorumIDs {
operatorSetParams, err := c.Transactor.GetOperatorSetParams(ctx, quorumID)
Expand All @@ -185,7 +185,7 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op
return nil, errors.New("maxOperatorCount is 0")
}

if uint32(len(operatorStakes[i])) < operatorSetParams.MaxOperatorCount {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly a bug in previous implementation.
operatorStakes should be indexed by quorum ID instead of index of quorumIDs

if uint32(len(operatorStakes[quorumID])) < operatorSetParams.MaxOperatorCount {
// quorum is not full, so we can continue
continue
}
Expand All @@ -197,9 +197,9 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op

// loop through operator stakes for the quorum and find the lowest one
totalStake := big.NewInt(0)
lowestStakeOperatorId := operatorStakes[i][0].OperatorID
lowestStake := operatorStakes[i][0].Stake
for _, operatorStake := range operatorStakes[i] {
lowestStakeOperatorId := operatorStakes[quorumID][0].OperatorID
lowestStake := operatorStakes[quorumID][0].Stake
for _, operatorStake := range operatorStakes[quorumID] {
if operatorStake.Stake.Cmp(lowestStake) < 0 {
lowestStake = operatorStake.Stake
lowestStakeOperatorId = operatorStake.OperatorID
Expand Down
6 changes: 3 additions & 3 deletions churner/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func setupMockTransactor() {
transactorMock.On("GetCurrentQuorumBitmapByOperatorId").Return(big.NewInt(2), nil)
transactorMock.On("GetCurrentBlockNumber").Return(uint32(2), nil)
transactorMock.On("GetQuorumCount").Return(uint16(1), nil)
transactorMock.On("GetOperatorStakesForQuorums").Return([][]dacore.OperatorStake{
{
{
transactorMock.On("GetOperatorStakesForQuorums").Return(dacore.OperatorStakes{
0: {
0: {
OperatorID: makeOperatorId(1),
Stake: big.NewInt(2),
},
Expand Down
18 changes: 5 additions & 13 deletions core/eth/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,35 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
}

return getOperatorState(operatorsByQuorum, quorums, uint32(blockNumber))

}

func (cs *ChainState) GetCurrentBlockNumber() (uint, error) {

ctx := context.Background()
header, err := cs.Client.HeaderByNumber(ctx, nil)
if err != nil {
return 0, err
}

return uint(header.Number.Uint64()), nil

}

func getOperatorState(operatorsByQuorum [][]core.OperatorStake, quorumIds []core.QuorumID, blockNumber uint32) (*core.OperatorState, error) {
func getOperatorState(operatorsByQuorum core.OperatorStakes, quorumIds []core.QuorumID, blockNumber uint32) (*core.OperatorState, error) {
operators := make(map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo)
totals := make(map[core.QuorumID]*core.OperatorInfo)

for i, quorum := range operatorsByQuorum {

for quorumID, quorum := range operatorsByQuorum {
totalStake := big.NewInt(0)

operators[quorumIds[i]] = make(map[core.OperatorID]*core.OperatorInfo)
operators[quorumID] = make(map[core.OperatorID]*core.OperatorInfo)

for ind, op := range quorum {

operators[quorumIds[i]][op.OperatorID] = &core.OperatorInfo{
operators[quorumID][op.OperatorID] = &core.OperatorInfo{
Stake: op.Stake,
Index: core.OperatorIndex(ind),
}
totalStake.Add(totalStake, op.Stake)

}

totals[quorumIds[i]] = &core.OperatorInfo{
totals[quorumID] = &core.OperatorInfo{
Stake: totalStake,
Index: core.OperatorIndex(len(quorum)),
}
Expand All @@ -87,5 +80,4 @@ func getOperatorState(operatorsByQuorum [][]core.OperatorStake, quorumIds []core
}

return state, nil

}
21 changes: 13 additions & 8 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (t *Transactor) UpdateOperatorSocket(ctx context.Context, socket string) er
// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
// is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum
// are also returned.
func (t *Transactor) GetOperatorStakes(ctx context.Context, operator core.OperatorID, blockNumber uint32) ([][]core.OperatorStake, []core.QuorumID, error) {
func (t *Transactor) GetOperatorStakes(ctx context.Context, operator core.OperatorID, blockNumber uint32) (core.OperatorStakes, []core.QuorumID, error) {
quorumBitmap, state_, err := t.Bindings.BLSOpStateRetriever.GetOperatorState0(&bind.CallOpts{
Context: ctx,
}, t.Bindings.RegCoordinatorAddr, operator, blockNumber)
Expand All @@ -341,11 +341,13 @@ func (t *Transactor) GetOperatorStakes(ctx context.Context, operator core.Operat
return nil, nil, err
}

state := make([][]core.OperatorStake, len(state_))
state := make(core.OperatorStakes, len(state_))
for i := range state_ {
state[i] = make([]core.OperatorStake, len(state_[i]))
quorumID := core.QuorumID(i)
state[quorumID] = make(map[core.OperatorIndex]core.OperatorStake, len(state_[i]))
for j, op := range state_[i] {
state[i][j] = core.OperatorStake{
operatorIndex := core.OperatorIndex(j)
state[quorumID][operatorIndex] = core.OperatorStake{
Stake: op.Stake,
OperatorID: op.OperatorId,
}
Expand Down Expand Up @@ -381,12 +383,13 @@ func (t *Transactor) GetStoreDurationBlocks(ctx context.Context) (uint32, error)

// GetOperatorStakesForQuorums returns the stakes of all operators within the supplied quorums. The returned stakes are for the block number supplied.
// The indices of the operators within each quorum are also returned.
func (t *Transactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) ([][]core.OperatorStake, error) {
func (t *Transactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) (core.OperatorStakes, error) {
quorumBytes := make([]byte, len(quorums))
for ind, quorum := range quorums {
quorumBytes[ind] = byte(uint8(quorum))
}

// state_ is a [][]*opstateretriever.OperatorStake with the same length and order as quorumBytes, and then indexed by operator index
state_, err := t.Bindings.BLSOpStateRetriever.GetOperatorState(&bind.CallOpts{
Context: ctx,
}, t.Bindings.RegCoordinatorAddr, quorumBytes, blockNumber)
Expand All @@ -395,11 +398,13 @@ func (t *Transactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []
return nil, err
}

state := make([][]core.OperatorStake, len(state_))
state := make(core.OperatorStakes, len(state_))
for i := range state_ {
state[i] = make([]core.OperatorStake, len(state_[i]))
quorumID := quorums[i]
state[quorumID] = make(map[core.OperatorIndex]core.OperatorStake, len(state_[i]))
for j, op := range state_[i] {
state[i][j] = core.OperatorStake{
operatorIndex := core.OperatorIndex(j)
state[quorumID][operatorIndex] = core.OperatorStake{
Stake: op.Stake,
OperatorID: op.OperatorId,
}
Expand Down
2 changes: 1 addition & 1 deletion core/indexer/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (ics *IndexedChainState) GetIndexedOperatorState(ctx context.Context, block
for _, quorum := range quorums {
key, ok := pubkeys.QuorumTotals[quorum]
if !ok {
return nil, errors.New("aggregate key for quorum not found")
continue
}
aggKeys[quorum] = &core.G1Point{G1Affine: key}
}
Expand Down
8 changes: 4 additions & 4 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ func (t *MockTransactor) UpdateOperatorSocket(ctx context.Context, socket string
return args.Error(0)
}

func (t *MockTransactor) GetOperatorStakes(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) ([][]core.OperatorStake, []core.QuorumID, error) {
func (t *MockTransactor) GetOperatorStakes(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (core.OperatorStakes, []core.QuorumID, error) {
args := t.Called()
result0 := args.Get(0)
result1 := args.Get(1)
return result0.([][]core.OperatorStake), result1.([]core.QuorumID), args.Error(1)
return result0.(core.OperatorStakes), result1.([]core.QuorumID), args.Error(1)
}

func (t *MockTransactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) ([][]core.OperatorStake, error) {
func (t *MockTransactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) (core.OperatorStakes, error) {
args := t.Called()
result := args.Get(0)
return result.([][]core.OperatorStake), args.Error(1)
return result.(core.OperatorStakes), args.Error(1)
}

func (t *MockTransactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Receipt, error) {
Expand Down
2 changes: 2 additions & 0 deletions core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type ChainState interface {
// ChainState is an interface for getting information about the current chain state.
type IndexedChainState interface {
ChainState
// GetIndexedOperatorState returns the IndexedOperatorState for the given block number and quorums
// If the quorum is not found, the quorum will be ignored and the IndexedOperatorState will be returned for the remaining quorums
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
GetIndexedOperatorState(ctx context.Context, blockNumber uint, quorums []QuorumID) (*IndexedOperatorState, error)
Start(context context.Context) error
}
55 changes: 42 additions & 13 deletions core/thegraph/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,25 @@ func (ics *indexedChainState) Start(ctx context.Context) error {
}
}

// GetIndexedOperatorState returns the IndexedOperatorState for the given block number and quorums
func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.IndexedOperatorState, error) {
operatorState, err := ics.ChainState.GetOperatorState(ctx, blockNumber, quorums)
if err != nil {
return nil, err
}

aggregatePublicKeys, err := ics.getQuorumAPKs(ctx, quorums, uint32(blockNumber))
if err != nil {
return nil, err
aggregatePublicKeys := ics.getQuorumAPKs(ctx, quorums, uint32(blockNumber))
aggKeys := make(map[uint8]*core.G1Point)
for _, apk := range aggregatePublicKeys {
if apk.Err != nil {
ics.logger.Warn("Error getting aggregate public key", "err", apk.Err)
continue
}
if apk.Err == nil && apk.AggregatePubk != nil {
aggKeys[apk.QuorumNumber] = apk.AggregatePubk
}
}
if len(aggKeys) == 0 {
return nil, errors.New("no aggregate public keys found for any of the specified quorums")
}

indexedOperators, err := ics.getRegisteredIndexedOperatorInfo(ctx, uint32(blockNumber))
Expand All @@ -125,7 +134,7 @@ func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, block
state := &core.IndexedOperatorState{
OperatorState: operatorState,
IndexedOperators: indexedOperators,
AggKeys: aggregatePublicKeys,
AggKeys: aggKeys,
}
return state, nil
}
Expand All @@ -147,21 +156,41 @@ func (ics *indexedChainState) GetIndexedOperatorInfoByOperatorId(ctx context.Con
return convertIndexedOperatorInfoGqlToIndexedOperatorInfo(&query.Operator)
}

type quorumAPK struct {
QuorumNumber uint8
AggregatePubk *core.G1Point
Err error
}

// GetQuorumAPKs returns the Aggregate Public Keys for the given quorums at the given block number
func (ics *indexedChainState) getQuorumAPKs(ctx context.Context, quorumIDs []core.QuorumID, blockNumber uint32) (map[uint8]*core.G1Point, error) {
quorumAPKs := make(map[uint8]*core.G1Point)
func (ics *indexedChainState) getQuorumAPKs(ctx context.Context, quorumIDs []core.QuorumID, blockNumber uint32) map[uint8]*quorumAPK {
quorumAPKs := make(map[uint8]*quorumAPK)
for i := range quorumIDs {
id := quorumIDs[i]
quorumAPK, err := ics.getQuorumAPK(ctx, id, blockNumber)
apk, err := ics.getQuorumAPK(ctx, id, blockNumber)
if err != nil {
return nil, err
quorumAPKs[id] = &quorumAPK{
QuorumNumber: uint8(id),
AggregatePubk: nil,
Err: err,
}
continue
}
if apk == nil {
quorumAPKs[id] = &quorumAPK{
QuorumNumber: uint8(id),
AggregatePubk: nil,
Err: fmt.Errorf("quorum APK not found for quorum %d", id),
}
continue
}
if quorumAPK == nil {
return nil, fmt.Errorf("quorum APK not found for quorum %d", id)
quorumAPKs[id] = &quorumAPK{
QuorumNumber: uint8(id),
AggregatePubk: apk,
Err: nil,
}
quorumAPKs[id] = quorumAPK
}
return quorumAPKs, nil
return quorumAPKs
}

// GetQuorumAPK returns the Aggregate Public Key for the given quorum at the given block number
Expand Down
6 changes: 4 additions & 2 deletions core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type OperatorSetParam struct {
ChurnBIPsOfTotalStake uint16
}

type OperatorStakes map[QuorumID]map[OperatorIndex]OperatorStake
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced existing use of [][]OperatorStake with this new type as it's more clear how OperaterStake are indexed


type Transactor interface {

// RegisterBLSPublicKey registers a new BLS public key with the pubkey compendium smart contract.
Expand Down Expand Up @@ -54,11 +56,11 @@ type Transactor interface {
// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
// is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum
// are also returned.
GetOperatorStakes(ctx context.Context, operatorID OperatorID, blockNumber uint32) ([][]OperatorStake, []QuorumID, error)
GetOperatorStakes(ctx context.Context, operatorID OperatorID, blockNumber uint32) (OperatorStakes, []QuorumID, error)

// GetOperatorStakes returns the stakes of all operators within the supplied quorums. The returned stakes are for the block number supplied.
// The indices of the operators within each quorum are also returned.
GetOperatorStakesForQuorums(ctx context.Context, quorums []QuorumID, blockNumber uint32) ([][]OperatorStake, error)
GetOperatorStakesForQuorums(ctx context.Context, quorums []QuorumID, blockNumber uint32) (OperatorStakes, error)

// ConfirmBatch confirms a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds
// specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail.
Expand Down
33 changes: 28 additions & 5 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,11 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan
e.logger.Trace("[encodingstreamer] new metadatas to encode", "numMetadata", len(metadatas), "duration", time.Since(stageTimer))

// Get the operator state
state, err := e.getOperatorStateForBlobs(ctx, metadatas, referenceBlockNumber)
state, err := e.getOperatorState(ctx, metadatas, referenceBlockNumber)
if err != nil {
return fmt.Errorf("error getting operator state: %w", err)
}
metadatas = e.validateMetadataQuorums(metadatas, state)

metadataByKey := make(map[disperser.BlobKey]*disperser.BlobMetadata, 0)
for _, metadata := range metadatas {
Expand Down Expand Up @@ -510,7 +511,7 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
i++
}

state, err := e.getOperatorStateForBlobs(context.Background(), metadatas, e.ReferenceBlockNumber)
state, err := e.getOperatorState(context.Background(), metadatas, e.ReferenceBlockNumber)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -544,7 +545,8 @@ func (e *EncodingStreamer) RemoveEncodedBlob(metadata *disperser.BlobMetadata) {
}
}

func (e *EncodingStreamer) getOperatorStateForBlobs(ctx context.Context, metadatas []*disperser.BlobMetadata, blockNumber uint) (*core.IndexedOperatorState, error) {
// getOperatorState returns the operator state for the blobs that have valid quorums
func (e *EncodingStreamer) getOperatorState(ctx context.Context, metadatas []*disperser.BlobMetadata, blockNumber uint) (*core.IndexedOperatorState, error) {

quorums := make(map[core.QuorumID]QuorumInfo, 0)
for _, metadata := range metadatas {
Expand All @@ -560,12 +562,33 @@ func (e *EncodingStreamer) getOperatorStateForBlobs(ctx context.Context, metadat
i++
}

// Get the operator state
// GetIndexedOperatorState should return state for valid quorums only
state, err := e.chainState.GetIndexedOperatorState(ctx, blockNumber, quorumIds)
if err != nil {
return nil, fmt.Errorf("error getting operator state at block number %d: %w", blockNumber, err)
}

return state, nil
}

// It also returns the list of valid blob metadatas (i.e. blobs that have valid quorums)
func (e *EncodingStreamer) validateMetadataQuorums(metadatas []*disperser.BlobMetadata, state *core.IndexedOperatorState) []*disperser.BlobMetadata {
validMetadata := make([]*disperser.BlobMetadata, 0)
for _, metadata := range metadatas {
valid := true
for _, quorum := range metadata.RequestMetadata.SecurityParams {
if aggKey, ok := state.AggKeys[quorum.QuorumID]; !ok || aggKey == nil {
e.logger.Warn("got blob with a quorum without APK. Will skip.", "quorum", quorum.QuorumID)
valid = false
}
}
if valid {
validMetadata = append(validMetadata, metadata)
} else {
err := e.blobStore.HandleBlobFailure(context.Background(), metadata, 0)
if err != nil {
e.logger.Error("error handling blob failure", "err", err)
}
}
}
return validMetadata
}
Loading
Loading