From d8259802ccd109b048a20e59f9adeccbaf798fa6 Mon Sep 17 00:00:00 2001 From: Ian Shim <100327837+ian-shim@users.noreply.github.com> Date: Thu, 30 May 2024 02:57:10 +0900 Subject: [PATCH] [node] Log operator state hash on validation failure (#584) --- core/state.go | 43 ++++++++++++++++ core/state_test.go | 98 ++++++++++++++++++++++++++++++++++++ disperser/batcher/batcher.go | 9 ++++ node/node.go | 18 ++++++- 4 files changed, 166 insertions(+), 2 deletions(-) create mode 100644 core/state_test.go diff --git a/core/state.go b/core/state.go index 62fc4e2ff7..7f8092eaaf 100644 --- a/core/state.go +++ b/core/state.go @@ -2,8 +2,11 @@ package core import ( "context" + "crypto/md5" + "encoding/json" "fmt" "math/big" + "slices" "strings" ) @@ -60,6 +63,46 @@ type OperatorState struct { BlockNumber uint } +func (s *OperatorState) Hash() (map[QuorumID][16]byte, error) { + res := make(map[QuorumID][16]byte) + type operatorInfoWithID struct { + OperatorID string + Stake string + Index uint + } + for quorumID, opInfos := range s.Operators { + marshalable := struct { + Operators []operatorInfoWithID + Totals OperatorInfo + BlockNumber uint + }{ + Operators: make([]operatorInfoWithID, 0, len(opInfos)), + Totals: OperatorInfo{}, + BlockNumber: s.BlockNumber, + } + + for opID, opInfo := range opInfos { + marshalable.Operators = append(marshalable.Operators, operatorInfoWithID{ + OperatorID: opID.Hex(), + Stake: opInfo.Stake.String(), + Index: uint(opInfo.Index), + }) + } + slices.SortStableFunc(marshalable.Operators, func(a, b operatorInfoWithID) int { + return strings.Compare(a.OperatorID, b.OperatorID) + }) + + marshalable.Totals = *s.Totals[quorumID] + data, err := json.Marshal(marshalable) + if err != nil { + return nil, err + } + res[quorumID] = md5.Sum(data) + } + + return res, nil +} + // IndexedOperatorInfo contains information about an operator which is contained in events from the EigenDA smart contracts. Note that // this information does not depend on the quorum. type IndexedOperatorInfo struct { diff --git a/core/state_test.go b/core/state_test.go new file mode 100644 index 0000000000..f8f0697458 --- /dev/null +++ b/core/state_test.go @@ -0,0 +1,98 @@ +package core_test + +import ( + "encoding/hex" + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/core" + "github.com/stretchr/testify/assert" +) + +func TestOperatorStateHash(t *testing.T) { + s1 := core.OperatorState{ + Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{ + 0: { + [32]byte{0}: &core.OperatorInfo{ + Stake: big.NewInt(12), + Index: uint(2), + }, + [32]byte{1}: &core.OperatorInfo{ + Stake: big.NewInt(23), + Index: uint(3), + }, + }, + 1: { + [32]byte{1}: &core.OperatorInfo{ + Stake: big.NewInt(23), + Index: uint(3), + }, + [32]byte{2}: &core.OperatorInfo{ + Stake: big.NewInt(34), + Index: uint(4), + }, + }, + }, + Totals: map[core.QuorumID]*core.OperatorInfo{ + 0: { + Stake: big.NewInt(35), + Index: uint(2), + }, + 1: { + Stake: big.NewInt(57), + Index: uint(2), + }, + }, + BlockNumber: uint(123), + } + + hash1, err := s1.Hash() + assert.NoError(t, err) + q0 := hash1[0] + q1 := hash1[1] + assert.Equal(t, "3805338f34f77ff1fa23bbc23b1e86c4", hex.EncodeToString(q0[:])) + assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:])) + + s2 := core.OperatorState{ + Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{ + 0: { + [32]byte{0}: &core.OperatorInfo{ + Stake: big.NewInt(12), + Index: uint(3), // different from s1 + }, + [32]byte{1}: &core.OperatorInfo{ + Stake: big.NewInt(23), + Index: uint(3), + }, + }, + 1: { + [32]byte{1}: &core.OperatorInfo{ + Stake: big.NewInt(23), + Index: uint(3), + }, + [32]byte{2}: &core.OperatorInfo{ + Stake: big.NewInt(34), + Index: uint(4), + }, + }, + }, + Totals: map[core.QuorumID]*core.OperatorInfo{ + 0: { + Stake: big.NewInt(35), + Index: uint(2), + }, + 1: { + Stake: big.NewInt(57), + Index: uint(2), + }, + }, + BlockNumber: uint(123), + } + + hash2, err := s2.Hash() + assert.NoError(t, err) + q0 = hash2[0] + q1 = hash2[1] + assert.Equal(t, "1836448b57ae79decdcb77157cf31698", hex.EncodeToString(q0[:])) + assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:])) +} diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 56cf352f53..738deeb588 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -420,6 +420,15 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { stageTimer = time.Now() update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader) log.Debug("DisperseBatch took", "duration", time.Since(stageTimer)) + h, err := batch.State.OperatorState.Hash() + if err != nil { + log.Error("HandleSingleBatch: error getting operator state hash", "err", err) + } + hStr := make([]string, 0, len(h)) + for q, hash := range h { + hStr = append(hStr, fmt.Sprintf("%d: %x", q, hash)) + } + log.Info("Dispatched encoded batch", "operatorStateHash", hStr) // Get the batch header hash log.Debug("Getting batch header hash...") diff --git a/node/node.go b/node/node.go index a39f6dcc8b..321a90f2ae 100644 --- a/node/node.go +++ b/node/node.go @@ -12,6 +12,7 @@ import ( "net/http" "net/url" "os" + "strings" "sync" "time" @@ -360,7 +361,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs log.Error("Failed to delete the invalid batch that should be rolled back", "batchHeaderHash", batchHeaderHashHex, "err", deleteKeysErr) } } - return nil, fmt.Errorf("failed to validate batch: %w", err) + return nil, err } n.Metrics.RecordStoreChunksStage("validated", batchSize, time.Since(stageTimer)) log.Debug("Validate batch took", "duration:", time.Since(stageTimer)) @@ -395,7 +396,20 @@ func (n *Node) ValidateBatch(ctx context.Context, header *core.BatchHeader, blob } pool := workerpool.New(n.Config.NumBatchValidators) - return n.Validator.ValidateBatch(header, blobs, operatorState, pool) + err = n.Validator.ValidateBatch(header, blobs, operatorState, pool) + if err != nil { + h, hashErr := operatorState.Hash() + if hashErr != nil { + n.Logger.Error("failed to get operator state hash", "err", hashErr) + } + + hStr := make([]string, 0, len(h)) + for q, hash := range h { + hStr = append(hStr, fmt.Sprintf("%d: %x", q, hash)) + } + return fmt.Errorf("failed to validate batch with operator state %x: %w", strings.Join(hStr, ","), err) + } + return nil } func (n *Node) updateSocketAddress(ctx context.Context, newSocketAddr string) {