Skip to content

Commit

Permalink
[node] Log operator state hash on validation failure (#584)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored and pschork committed May 30, 2024
1 parent 52510fb commit d825980
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 2 deletions.
43 changes: 43 additions & 0 deletions core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package core

import (
"context"
"crypto/md5"
"encoding/json"
"fmt"
"math/big"
"slices"
"strings"
)

Expand Down Expand Up @@ -60,6 +63,46 @@ type OperatorState struct {
BlockNumber uint
}

func (s *OperatorState) Hash() (map[QuorumID][16]byte, error) {
res := make(map[QuorumID][16]byte)
type operatorInfoWithID struct {
OperatorID string
Stake string
Index uint
}
for quorumID, opInfos := range s.Operators {
marshalable := struct {
Operators []operatorInfoWithID
Totals OperatorInfo
BlockNumber uint
}{
Operators: make([]operatorInfoWithID, 0, len(opInfos)),
Totals: OperatorInfo{},
BlockNumber: s.BlockNumber,
}

for opID, opInfo := range opInfos {
marshalable.Operators = append(marshalable.Operators, operatorInfoWithID{
OperatorID: opID.Hex(),
Stake: opInfo.Stake.String(),
Index: uint(opInfo.Index),
})
}
slices.SortStableFunc(marshalable.Operators, func(a, b operatorInfoWithID) int {
return strings.Compare(a.OperatorID, b.OperatorID)
})

marshalable.Totals = *s.Totals[quorumID]
data, err := json.Marshal(marshalable)
if err != nil {
return nil, err
}
res[quorumID] = md5.Sum(data)
}

return res, nil
}

// IndexedOperatorInfo contains information about an operator which is contained in events from the EigenDA smart contracts. Note that
// this information does not depend on the quorum.
type IndexedOperatorInfo struct {
Expand Down
98 changes: 98 additions & 0 deletions core/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package core_test

import (
"encoding/hex"
"math/big"
"testing"

"github.com/Layr-Labs/eigenda/core"
"github.com/stretchr/testify/assert"
)

func TestOperatorStateHash(t *testing.T) {
s1 := core.OperatorState{
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
0: {
[32]byte{0}: &core.OperatorInfo{
Stake: big.NewInt(12),
Index: uint(2),
},
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
},
},
1: {
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
},
[32]byte{2}: &core.OperatorInfo{
Stake: big.NewInt(34),
Index: uint(4),
},
},
},
Totals: map[core.QuorumID]*core.OperatorInfo{
0: {
Stake: big.NewInt(35),
Index: uint(2),
},
1: {
Stake: big.NewInt(57),
Index: uint(2),
},
},
BlockNumber: uint(123),
}

hash1, err := s1.Hash()
assert.NoError(t, err)
q0 := hash1[0]
q1 := hash1[1]
assert.Equal(t, "3805338f34f77ff1fa23bbc23b1e86c4", hex.EncodeToString(q0[:]))
assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:]))

s2 := core.OperatorState{
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
0: {
[32]byte{0}: &core.OperatorInfo{
Stake: big.NewInt(12),
Index: uint(3), // different from s1
},
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
},
},
1: {
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
},
[32]byte{2}: &core.OperatorInfo{
Stake: big.NewInt(34),
Index: uint(4),
},
},
},
Totals: map[core.QuorumID]*core.OperatorInfo{
0: {
Stake: big.NewInt(35),
Index: uint(2),
},
1: {
Stake: big.NewInt(57),
Index: uint(2),
},
},
BlockNumber: uint(123),
}

hash2, err := s2.Hash()
assert.NoError(t, err)
q0 = hash2[0]
q1 = hash2[1]
assert.Equal(t, "1836448b57ae79decdcb77157cf31698", hex.EncodeToString(q0[:]))
assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:]))
}
9 changes: 9 additions & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,15 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
stageTimer = time.Now()
update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader)
log.Debug("DisperseBatch took", "duration", time.Since(stageTimer))
h, err := batch.State.OperatorState.Hash()
if err != nil {
log.Error("HandleSingleBatch: error getting operator state hash", "err", err)
}
hStr := make([]string, 0, len(h))
for q, hash := range h {
hStr = append(hStr, fmt.Sprintf("%d: %x", q, hash))
}
log.Info("Dispatched encoded batch", "operatorStateHash", hStr)

// Get the batch header hash
log.Debug("Getting batch header hash...")
Expand Down
18 changes: 16 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -360,7 +361,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
log.Error("Failed to delete the invalid batch that should be rolled back", "batchHeaderHash", batchHeaderHashHex, "err", deleteKeysErr)
}
}
return nil, fmt.Errorf("failed to validate batch: %w", err)
return nil, err
}
n.Metrics.RecordStoreChunksStage("validated", batchSize, time.Since(stageTimer))
log.Debug("Validate batch took", "duration:", time.Since(stageTimer))
Expand Down Expand Up @@ -395,7 +396,20 @@ func (n *Node) ValidateBatch(ctx context.Context, header *core.BatchHeader, blob
}

pool := workerpool.New(n.Config.NumBatchValidators)
return n.Validator.ValidateBatch(header, blobs, operatorState, pool)
err = n.Validator.ValidateBatch(header, blobs, operatorState, pool)
if err != nil {
h, hashErr := operatorState.Hash()
if hashErr != nil {
n.Logger.Error("failed to get operator state hash", "err", hashErr)
}

hStr := make([]string, 0, len(h))
for q, hash := range h {
hStr = append(hStr, fmt.Sprintf("%d: %x", q, hash))
}
return fmt.Errorf("failed to validate batch with operator state %x: %w", strings.Join(hStr, ","), err)
}
return nil
}

func (n *Node) updateSocketAddress(ctx context.Context, newSocketAddr string) {
Expand Down

0 comments on commit d825980

Please sign in to comment.