diff --git a/core/aggregation.go b/core/aggregation.go index 3e156d24ff..2a023e9bb7 100644 --- a/core/aggregation.go +++ b/core/aggregation.go @@ -22,10 +22,11 @@ var ( ErrAggSigNotValid = errors.New("aggregated signature is not valid") ) -type SignerMessage struct { - Signature *Signature - Operator OperatorID - Err error +type SigningMessage struct { + Signature *Signature + Operator OperatorID + BatchHeaderHash [32]byte + Err error } // SignatureAggregation contains the results of aggregating signatures from a set of operators @@ -53,7 +54,7 @@ type SignatureAggregator interface { // AggregateSignatures blocks until it receives a response for each operator in the operator state via messageChan, and then returns the aggregated signature. // If the aggregated signature is invalid, an error is returned. - AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SignerMessage) (*SignatureAggregation, error) + AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SigningMessage) (*SignatureAggregation, error) } type StdSignatureAggregator struct { @@ -78,7 +79,7 @@ func NewStdSignatureAggregator(logger logging.Logger, transactor Transactor) (*S var _ SignatureAggregator = (*StdSignatureAggregator)(nil) -func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SignerMessage) (*SignatureAggregation, error) { +func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SigningMessage) (*SignatureAggregation, error) { // TODO: Add logging if len(quorumIDs) == 0 { @@ -127,7 +128,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state socket = op.Socket } if r.Err != nil { - a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "err", r.Err) + a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "batchHeaderHash", r.BatchHeaderHash, "err", r.Err) continue } @@ -170,7 +171,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state aggPubKeys[ind].Add(op.PubkeyG2) } } - a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums)) + a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums), "batchHeaderHash", r.BatchHeaderHash) } // Aggregate Non signer Pubkey Id diff --git a/core/aggregation_test.go b/core/aggregation_test.go index 20ae2e45ed..61d722cc5e 100644 --- a/core/aggregation_test.go +++ b/core/aggregation_test.go @@ -42,7 +42,7 @@ func TestMain(m *testing.M) { os.Exit(code) } -func simulateOperators(state mock.PrivateOperatorState, message [32]byte, update chan core.SignerMessage, advCount uint) { +func simulateOperators(state mock.PrivateOperatorState, message [32]byte, update chan core.SigningMessage, advCount uint) { count := 0 @@ -54,13 +54,13 @@ func simulateOperators(state mock.PrivateOperatorState, message [32]byte, update op := state.PrivateOperators[id] sig := op.KeyPair.SignMessage(message) if count < len(state.IndexedOperators)-int(advCount) { - update <- core.SignerMessage{ + update <- core.SigningMessage{ Signature: sig, Operator: id, Err: nil, } } else { - update <- core.SignerMessage{ + update <- core.SigningMessage{ Signature: nil, Operator: id, Err: errors.New("adversary"), @@ -154,7 +154,7 @@ func TestAggregateSignaturesStatus(t *testing.T) { t.Run(tt.name, func(t *testing.T) { state := dat.GetTotalOperatorStateWithQuorums(context.Background(), 0, []core.QuorumID{0, 1}) - update := make(chan core.SignerMessage) + update := make(chan core.SigningMessage) message := [32]byte{1, 2, 3, 4, 5, 6} go simulateOperators(*state, message, update, tt.adversaryCount) @@ -183,7 +183,7 @@ func TestSortNonsigners(t *testing.T) { state := dat.GetTotalOperatorState(context.Background(), 0) - update := make(chan core.SignerMessage) + update := make(chan core.SigningMessage) message := [32]byte{1, 2, 3, 4, 5, 6} go simulateOperators(*state, message, update, 4) diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 49e6fd0928..26881e6fe3 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -37,8 +37,8 @@ func NewDispatcher(cfg *Config, logger logging.Logger, metrics *batcher.Dispatch var _ disperser.Dispatcher = (*dispatcher)(nil) -func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SignerMessage { - update := make(chan core.SignerMessage, len(state.IndexedOperators)) +func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SigningMessage { + update := make(chan core.SigningMessage, len(state.IndexedOperators)) // Disperse c.sendAllChunks(ctx, state, blobs, batchHeader, update) @@ -46,11 +46,15 @@ func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera return update } -func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SignerMessage) { +func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SigningMessage) { for id, op := range state.IndexedOperators { go func(op core.IndexedOperatorInfo, id core.OperatorID) { blobMessages := make([]*core.BlobMessage, 0) hasAnyBundles := false + batchHeaderHash, err := batchHeader.GetBatchHeaderHash() + if err != nil { + return + } for _, blob := range blobs { if _, ok := blob.BundlesByOperator[id]; ok { hasAnyBundles = true @@ -63,35 +67,31 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera } if !hasAnyBundles { // Operator is not part of any quorum, no need to send chunks - update <- core.SignerMessage{ - Err: errors.New("operator is not part of any quorum"), - Signature: nil, - Operator: id, + update <- core.SigningMessage{ + Err: errors.New("operator is not part of any quorum"), + Signature: nil, + Operator: id, + BatchHeaderHash: batchHeaderHash, } return } - batchHeaderHash, err := batchHeader.GetBatchHeaderHash() - if err != nil { - return - } requestedAt := time.Now() sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op) if err != nil { - update <- core.SignerMessage{ - Err: err, - Signature: nil, - Operator: id, + update <- core.SigningMessage{ + Err: err, + Signature: nil, + Operator: id, + BatchHeaderHash: batchHeaderHash, } - c.logger.Warn("Failed to send chunks to operator", "batchHeaderHash", batchHeaderHash, "operator", op.Socket, "err", err) c.metrics.ObserveLatency(false, float64(time.Since(requestedAt).Milliseconds())) } else { - update <- core.SignerMessage{ + update <- core.SigningMessage{ Signature: sig, Operator: id, Err: nil, } - c.logger.Debug("Successfully sent chunks to operator", "batchHeaderHash", batchHeaderHash, "operator", op.Socket) c.metrics.ObserveLatency(true, float64(time.Since(requestedAt).Milliseconds())) } diff --git a/disperser/disperser.go b/disperser/disperser.go index 004683a888..4b16b2933c 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -174,7 +174,7 @@ type BlobStore interface { } type Dispatcher interface { - DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SignerMessage + DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage } // GenerateReverseIndexKey returns the key used to store the blob key in the reverse index diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index 33dd9bd59e..980bf0c9fd 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -20,12 +20,12 @@ func NewDispatcher(state *mock.PrivateOperatorState) disperser.Dispatcher { } } -func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SignerMessage { - update := make(chan core.SignerMessage) +func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SigningMessage { + update := make(chan core.SigningMessage) message, err := header.GetBatchHeaderHash() if err != nil { for id := range d.state.PrivateOperators { - update <- core.SignerMessage{ + update <- core.SigningMessage{ Signature: nil, Operator: id, Err: err, @@ -37,7 +37,7 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera for id, op := range d.state.PrivateOperators { sig := op.KeyPair.SignMessage(message) - update <- core.SignerMessage{ + update <- core.SigningMessage{ Signature: sig, Operator: id, Err: nil,