Skip to content

Commit

Permalink
encoding streamer validate quorums
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Dec 6, 2023
1 parent d83442a commit e785104
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 49 deletions.
10 changes: 5 additions & 5 deletions churner/churner.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (c *churner) createChurnResponse(
}, nil
}

func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, operatorStakes [][]core.OperatorStake, operatorToRegisterAddress gethcommon.Address, currentBlockNumber uint32) ([]core.OperatorToChurn, error) {
func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, operatorStakes core.OperatorStakes, operatorToRegisterAddress gethcommon.Address, currentBlockNumber uint32) ([]core.OperatorToChurn, error) {
operatorsToChurn := make([]core.OperatorToChurn, 0)
for i, quorumID := range quorumIDs {
operatorSetParams, err := c.Transactor.GetOperatorSetParams(ctx, quorumID)
Expand All @@ -184,7 +184,7 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op
return nil, errors.New("maxOperatorCount is 0")
}

if uint32(len(operatorStakes[i])) < operatorSetParams.MaxOperatorCount {
if uint32(len(operatorStakes[quorumID])) < operatorSetParams.MaxOperatorCount {
// quorum is not full, so we can continue
continue
}
Expand All @@ -196,9 +196,9 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op

// loop through operator stakes for the quorum and find the lowest one
totalStake := big.NewInt(0)
lowestStakeOperatorId := operatorStakes[i][0].OperatorID
lowestStake := operatorStakes[i][0].Stake
for _, operatorStake := range operatorStakes[i] {
lowestStakeOperatorId := operatorStakes[quorumID][0].OperatorID
lowestStake := operatorStakes[quorumID][0].Stake
for _, operatorStake := range operatorStakes[quorumID] {
if operatorStake.Stake.Cmp(lowestStake) < 0 {
lowestStake = operatorStake.Stake
lowestStakeOperatorId = operatorStake.OperatorID
Expand Down
6 changes: 3 additions & 3 deletions churner/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func setupMockTransactor() {
transactorMock.On("GetCurrentQuorumBitmapByOperatorId").Return(big.NewInt(2), nil)
transactorMock.On("GetCurrentBlockNumber").Return(uint32(2), nil)
transactorMock.On("GetQuorumCount").Return(uint16(1), nil)
transactorMock.On("GetOperatorStakesForQuorums").Return([][]dacore.OperatorStake{
{
{
transactorMock.On("GetOperatorStakesForQuorums").Return(dacore.OperatorStakes{
0: {
0: {
OperatorID: makeOperatorId(1),
Stake: big.NewInt(2),
},
Expand Down
10 changes: 1 addition & 9 deletions core/eth/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,39 +39,32 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
}

return getOperatorState(operatorsByQuorum, quorums, uint32(blockNumber))

}

func (cs *ChainState) GetCurrentBlockNumber() (uint, error) {

ctx := context.Background()
header, err := cs.Client.HeaderByNumber(ctx, nil)
if err != nil {
return 0, err
}

return uint(header.Number.Uint64()), nil

}

func getOperatorState(operatorsByQuorum [][]core.OperatorStake, quorumIds []core.QuorumID, blockNumber uint32) (*core.OperatorState, error) {
func getOperatorState(operatorsByQuorum core.OperatorStakes, quorumIds []core.QuorumID, blockNumber uint32) (*core.OperatorState, error) {
operators := make(map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo)
totals := make(map[core.QuorumID]*core.OperatorInfo)

for i, quorum := range operatorsByQuorum {

totalStake := big.NewInt(0)

operators[quorumIds[i]] = make(map[core.OperatorID]*core.OperatorInfo)

for ind, op := range quorum {

operators[quorumIds[i]][op.OperatorID] = &core.OperatorInfo{
Stake: op.Stake,
Index: core.OperatorIndex(ind),
}
totalStake.Add(totalStake, op.Stake)

}

totals[quorumIds[i]] = &core.OperatorInfo{
Expand All @@ -87,5 +80,4 @@ func getOperatorState(operatorsByQuorum [][]core.OperatorStake, quorumIds []core
}

return state, nil

}
21 changes: 13 additions & 8 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (t *Transactor) UpdateOperatorSocket(ctx context.Context, socket string) er
// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
// is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum
// are also returned.
func (t *Transactor) GetOperatorStakes(ctx context.Context, operator core.OperatorID, blockNumber uint32) ([][]core.OperatorStake, []core.QuorumID, error) {
func (t *Transactor) GetOperatorStakes(ctx context.Context, operator core.OperatorID, blockNumber uint32) (core.OperatorStakes, []core.QuorumID, error) {
quorumBitmap, state_, err := t.Bindings.BLSOpStateRetriever.GetOperatorState0(&bind.CallOpts{
Context: ctx,
}, t.Bindings.RegCoordinatorAddr, operator, blockNumber)
Expand All @@ -316,11 +316,13 @@ func (t *Transactor) GetOperatorStakes(ctx context.Context, operator core.Operat
return nil, nil, err
}

state := make([][]core.OperatorStake, len(state_))
state := make(core.OperatorStakes, len(state_))
for i := range state_ {
state[i] = make([]core.OperatorStake, len(state_[i]))
quorumID := core.QuorumID(i)
state[quorumID] = make(map[core.OperatorIndex]core.OperatorStake, len(state_[i]))
for j, op := range state_[i] {
state[i][j] = core.OperatorStake{
operatorIndex := core.OperatorIndex(j)
state[quorumID][operatorIndex] = core.OperatorStake{
Stake: op.Stake,
OperatorID: op.OperatorId,
}
Expand Down Expand Up @@ -356,12 +358,13 @@ func (t *Transactor) GetStoreDurationBlocks(ctx context.Context) (uint32, error)

// GetOperatorStakesForQuorums returns the stakes of all operators within the supplied quorums. The returned stakes are for the block number supplied.
// The indices of the operators within each quorum are also returned.
func (t *Transactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) ([][]core.OperatorStake, error) {
func (t *Transactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) (core.OperatorStakes, error) {
quorumBytes := make([]byte, len(quorums))
for ind, quorum := range quorums {
quorumBytes[ind] = byte(uint8(quorum))
}

// state_ is a [][]*opstateretriever.OperatorStake indexed by quorum number and then by operator index
state_, err := t.Bindings.BLSOpStateRetriever.GetOperatorState(&bind.CallOpts{
Context: ctx,
}, t.Bindings.RegCoordinatorAddr, quorumBytes, blockNumber)
Expand All @@ -370,11 +373,13 @@ func (t *Transactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []
return nil, err
}

state := make([][]core.OperatorStake, len(state_))
state := make(core.OperatorStakes, len(state_))
for i := range state_ {
state[i] = make([]core.OperatorStake, len(state_[i]))
quorumID := core.QuorumID(i)
state[quorumID] = make(map[core.OperatorIndex]core.OperatorStake, len(state_[i]))
for j, op := range state_[i] {
state[i][j] = core.OperatorStake{
operatorIndex := core.OperatorIndex(j)
state[quorumID][operatorIndex] = core.OperatorStake{
Stake: op.Stake,
OperatorID: op.OperatorId,
}
Expand Down
8 changes: 4 additions & 4 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ func (t *MockTransactor) UpdateOperatorSocket(ctx context.Context, socket string
return args.Error(0)
}

func (t *MockTransactor) GetOperatorStakes(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) ([][]core.OperatorStake, []core.QuorumID, error) {
func (t *MockTransactor) GetOperatorStakes(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (core.OperatorStakes, []core.QuorumID, error) {
args := t.Called()
result0 := args.Get(0)
result1 := args.Get(1)
return result0.([][]core.OperatorStake), result1.([]core.QuorumID), args.Error(1)
return result0.(core.OperatorStakes), result1.([]core.QuorumID), args.Error(1)
}

func (t *MockTransactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) ([][]core.OperatorStake, error) {
func (t *MockTransactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) (core.OperatorStakes, error) {
args := t.Called()
result := args.Get(0)
return result.([][]core.OperatorStake), args.Error(1)
return result.(core.OperatorStakes), args.Error(1)
}

func (t *MockTransactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Receipt, error) {
Expand Down
51 changes: 39 additions & 12 deletions core/thegraph/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,22 @@ func (ics *indexedChainState) Start(ctx context.Context) error {
}

// GetIndexedOperatorState returns the IndexedOperatorState for the given block number and quorums
// If the quorum is not found, the quorum will be ignored and the IndexedOperatorState will be returned for the remaining quorums
func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.IndexedOperatorState, error) {
operatorState, err := ics.ChainState.GetOperatorState(ctx, blockNumber, quorums)
if err != nil {
return nil, err
}

aggregatePublicKeys, err := ics.getQuorumAPKs(ctx, quorums, uint32(blockNumber))
if err != nil {
return nil, err
aggregatePublicKeys := ics.getQuorumAPKs(ctx, quorums, uint32(blockNumber))
aggKeys := make(map[uint8]*core.G1Point)
for _, apk := range aggregatePublicKeys {
if apk.Err == nil && apk.AggregatePubk != nil {
aggKeys[apk.QuorumNumber] = apk.AggregatePubk
}
}
if len(aggKeys) == 0 {
return nil, errors.New("no aggregate public keys found for any of the specified quorums")
}

indexedOperators, err := ics.getRegisteredIndexedOperatorInfo(ctx, uint32(blockNumber))
Expand All @@ -125,7 +132,7 @@ func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, block
state := &core.IndexedOperatorState{
OperatorState: operatorState,
IndexedOperators: indexedOperators,
AggKeys: aggregatePublicKeys,
AggKeys: aggKeys,
}
return state, nil
}
Expand All @@ -147,21 +154,41 @@ func (ics *indexedChainState) GetIndexedOperatorInfoByOperatorId(ctx context.Con
return convertIndexedOperatorInfoGqlToIndexedOperatorInfo(&query.Operator)
}

type quorumAPK struct {
QuorumNumber uint8
AggregatePubk *core.G1Point
Err error
}

// GetQuorumAPKs returns the Aggregate Public Keys for the given quorums at the given block number
func (ics *indexedChainState) getQuorumAPKs(ctx context.Context, quorumIDs []core.QuorumID, blockNumber uint32) (map[uint8]*core.G1Point, error) {
quorumAPKs := make(map[uint8]*core.G1Point)
func (ics *indexedChainState) getQuorumAPKs(ctx context.Context, quorumIDs []core.QuorumID, blockNumber uint32) map[uint8]*quorumAPK {
quorumAPKs := make(map[uint8]*quorumAPK)
for i := range quorumIDs {
id := quorumIDs[i]
quorumAPK, err := ics.getQuorumAPK(ctx, id, blockNumber)
apk, err := ics.getQuorumAPK(ctx, id, blockNumber)
if err != nil {
return nil, err
quorumAPKs[id] = &quorumAPK{
QuorumNumber: uint8(id),
AggregatePubk: nil,
Err: err,
}
continue
}
if apk == nil {
quorumAPKs[id] = &quorumAPK{
QuorumNumber: uint8(id),
AggregatePubk: nil,
Err: fmt.Errorf("quorum APK not found for quorum %d", id),
}
continue
}
if quorumAPK == nil {
return nil, fmt.Errorf("quorum APK not found for quorum %d", id)
quorumAPKs[id] = &quorumAPK{
QuorumNumber: uint8(id),
AggregatePubk: apk,
Err: nil,
}
quorumAPKs[id] = quorumAPK
}
return quorumAPKs, nil
return quorumAPKs
}

// GetQuorumAPK returns the Aggregate Public Key for the given quorum at the given block number
Expand Down
6 changes: 4 additions & 2 deletions core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type OperatorSetParam struct {
ChurnBIPsOfTotalStake uint16
}

type OperatorStakes map[QuorumID]map[OperatorIndex]OperatorStake

type Transactor interface {

// RegisterBLSPublicKey registers a new BLS public key with the pubkey compendium smart contract.
Expand Down Expand Up @@ -54,11 +56,11 @@ type Transactor interface {
// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
// is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum
// are also returned.
GetOperatorStakes(ctx context.Context, operatorID OperatorID, blockNumber uint32) ([][]OperatorStake, []QuorumID, error)
GetOperatorStakes(ctx context.Context, operatorID OperatorID, blockNumber uint32) (OperatorStakes, []QuorumID, error)

// GetOperatorStakes returns the stakes of all operators within the supplied quorums. The returned stakes are for the block number supplied.
// The indices of the operators within each quorum are also returned.
GetOperatorStakesForQuorums(ctx context.Context, quorums []QuorumID, blockNumber uint32) ([][]OperatorStake, error)
GetOperatorStakesForQuorums(ctx context.Context, quorums []QuorumID, blockNumber uint32) (OperatorStakes, error)

// ConfirmBatch confirms a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds
// specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail.
Expand Down
37 changes: 31 additions & 6 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan

e.logger.Trace("[encodingstreamer] new metadatas to encode", "numMetadata", len(metadatas), "duration", time.Since(stageTimer))

batchMetadata, err := e.getBatchMetadata(ctx, metadatas, referenceBlockNumber)
var batchMetadata *batchMetadata
batchMetadata, metadatas, err = e.getBatchMetadata(ctx, metadatas, referenceBlockNumber)
if err != nil {
return fmt.Errorf("error getting quorum infos: %w", err)
}
Expand Down Expand Up @@ -504,7 +505,7 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
i++
}

batchMetadata, err := e.getBatchMetadata(context.Background(), metadatas, e.ReferenceBlockNumber)
batchMetadata, _, err := e.getBatchMetadata(context.Background(), metadatas, e.ReferenceBlockNumber)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -538,7 +539,7 @@ func (e *EncodingStreamer) RemoveEncodedBlob(metadata *disperser.BlobMetadata) {
}
}

func (e *EncodingStreamer) getBatchMetadata(ctx context.Context, metadatas []*disperser.BlobMetadata, blockNumber uint) (*batchMetadata, error) {
func (e *EncodingStreamer) getBatchMetadata(ctx context.Context, metadatas []*disperser.BlobMetadata, blockNumber uint) (*batchMetadata, []*disperser.BlobMetadata, error) {
quorums := make(map[core.QuorumID]QuorumInfo, 0)
for _, metadata := range metadatas {
for _, quorum := range metadata.RequestMetadata.SecurityParams {
Expand All @@ -556,13 +557,37 @@ func (e *EncodingStreamer) getBatchMetadata(ctx context.Context, metadatas []*di
// Get the operator state
state, err := e.chainState.GetIndexedOperatorState(ctx, blockNumber, quorumIds)
if err != nil {
return nil, fmt.Errorf("error getting operator state at block number %d: %w", blockNumber, err)
return nil, nil, fmt.Errorf("error getting operator state at block number %d: %w", blockNumber, err)
}

for q := range quorums {
if _, ok := state.AggKeys[q]; !ok {
delete(quorums, q)
}
}
validMetadata := make([]*disperser.BlobMetadata, 0)
for _, metadata := range metadatas {
valid := true
for _, quorum := range metadata.RequestMetadata.SecurityParams {
if aggKey, ok := state.AggKeys[quorum.QuorumID]; !ok || aggKey == nil {
e.logger.Warn("got blob with a quorum without APK. Will skip.", "quorum", quorum.QuorumID)
valid = false
}
}
if valid {
validMetadata = append(validMetadata, metadata)
} else {
err := e.blobStore.HandleBlobFailure(ctx, metadata, 0)
if err != nil {
e.logger.Error("error handling blob failure", "err", err)
}
}
}

for quorumID := range quorums {
assignments, info, err := e.assignmentCoordinator.GetAssignments(state.OperatorState, quorumID, QuantizationFactor)
if err != nil {
return nil, err
return nil, nil, err
}
quorums[quorumID] = QuorumInfo{
Assignments: assignments,
Expand All @@ -574,5 +599,5 @@ func (e *EncodingStreamer) getBatchMetadata(ctx context.Context, metadatas []*di
return &batchMetadata{
QuorumInfos: quorums,
State: state,
}, nil
}, validMetadata, nil
}
Loading

0 comments on commit e785104

Please sign in to comment.