Skip to content

Commit

Permalink
fix: operator double signature (#1088)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcosNicolau authored Oct 1, 2024
1 parent d128869 commit 6b1a4ae
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 83 deletions.
20 changes: 1 addition & 19 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ type Aggregator struct {
// Stores the TaskResponse for each batch by batchIdentifierHash
batchDataByIdentifierHash map[[32]byte]BatchData

// Stores if an operator already submitted a response for a batch
// This is to avoid double submissions
// struct{} is used as a placeholder because it is the smallest type
// go does not have a set type
operatorRespondedBatch map[uint32]map[eigentypes.Bytes32]struct{}

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
Expand Down Expand Up @@ -146,7 +140,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
return taskResponseDigest, nil
}

operatorPubkeysService := oppubkeysserv.NewOperatorsInfoServiceInMemory(context.Background(), clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, nil, logger)
operatorPubkeysService := oppubkeysserv.NewOperatorsInfoServiceInMemory(context.Background(), clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, nil, oppubkeysserv.Opts{}, logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader.ChainReader, operatorPubkeysService, logger)
blsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, hashFunction, logger)

Expand All @@ -167,7 +161,6 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
batchDataByIdentifierHash: batchDataByIdentifierHash,
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
operatorRespondedBatch: make(map[uint32]map[eigentypes.Bytes32]struct{}),
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},
walletMutex: &sync.Mutex{},
Expand Down Expand Up @@ -220,12 +213,6 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.taskMutex.Lock()
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
agg.logger.Error("BlsAggregationServiceResponse contains an error", "err", blsAggServiceResp.Err, "batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
agg.logger.Info("- Locking task mutex: Delete task from operator map", "taskIndex", blsAggServiceResp.TaskIndex)

// Remove task from the list of tasks
delete(agg.operatorRespondedBatch, blsAggServiceResp.TaskIndex)

agg.logger.Info("- Unlocking task mutex: Delete task from operator map", "taskIndex", blsAggServiceResp.TaskIndex)
agg.taskMutex.Unlock()
return
}
Expand Down Expand Up @@ -254,10 +241,6 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]

// Delete the task from the map
delete(agg.operatorRespondedBatch, blsAggServiceResp.TaskIndex)

agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching merkle root")
agg.taskMutex.Unlock()

Expand All @@ -282,7 +265,6 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

Expand Down
26 changes: 1 addition & 25 deletions aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"net/rpc"
"time"

eigentypes "github.com/Layr-Labs/eigensdk-go/types"

"github.com/yetanotherco/aligned_layer/core/types"
)

Expand Down Expand Up @@ -74,26 +72,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
return nil
}

// Note: we already have lock here
agg.logger.Debug("- Checking if operator already responded")
batchResponses, ok := agg.operatorRespondedBatch[taskIndex]
if !ok {
batchResponses = make(map[eigentypes.Bytes32]struct{})
agg.operatorRespondedBatch[taskIndex] = batchResponses
}

if _, ok := batchResponses[signedTaskResponse.OperatorId]; ok {
*reply = 0
agg.logger.Warn("Operator already responded, ignoring",
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]),
"taskIndex", taskIndex, "batchMerkleRoot", hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]))

agg.taskMutex.Unlock()
return nil
}

batchResponses[signedTaskResponse.OperatorId] = struct{}{}

// Don't wait infinitely if it can't answer
// Create a context with a timeout of 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand All @@ -111,9 +89,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t

if err != nil {
agg.logger.Warnf("BLS aggregation service error: %s", err)
// remove operator from the list of operators that responded
// so that it can try again
delete(batchResponses, signedTaskResponse.OperatorId)
// todo shouldn't we here close the channel with a reply = 1?
} else {
agg.logger.Info("BLS process succeeded")
}
Expand Down
4 changes: 2 additions & 2 deletions core/chainio/avs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func NewAvsReaderFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
}

func (r *AvsReader) GetErc20Mock(tokenAddr gethcommon.Address) (*contractERC20Mock.ContractERC20Mock, error) {
erc20Mock, err := contractERC20Mock.NewContractERC20Mock(tokenAddr, r.AvsContractBindings.ethClient)
erc20Mock, err := contractERC20Mock.NewContractERC20Mock(tokenAddr, &r.AvsContractBindings.ethClient)
if err != nil {
// Retry with fallback client
erc20Mock, err = contractERC20Mock.NewContractERC20Mock(tokenAddr, r.AvsContractBindings.ethClientFallback)
erc20Mock, err = contractERC20Mock.NewContractERC20Mock(tokenAddr, &r.AvsContractBindings.ethClientFallback)
if err != nil {
r.logger.Error("Failed to fetch ERC20Mock contract", "err", err)
}
Expand Down
5 changes: 2 additions & 3 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type AvsWriter struct {
AvsContractBindings *AvsServiceBindings
logger logging.Logger
Signer signer.Signer
Client eth.Client
ClientFallback eth.Client
Client eth.InstrumentedClient
ClientFallback eth.InstrumentedClient
}

func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.EcdsaConfig) (*AvsWriter, error) {
Expand Down Expand Up @@ -119,7 +119,6 @@ func (w *AvsWriter) checkRespondToTaskFeeLimit(tx *types.Transaction, txOpts bin
// Proceed to check values against simulated costs
w.logger.Error("Failed to get batch state", "error", err)
w.logger.Info("Proceeding with simulated cost checks")

return w.compareBalances(simulatedCost, aggregatorAddress, senderAddress)
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/chainio/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ import (
type AvsServiceBindings struct {
ServiceManager *csservicemanager.ContractAlignedLayerServiceManager
ServiceManagerFallback *csservicemanager.ContractAlignedLayerServiceManager
ethClient eth.Client
ethClientFallback eth.Client
ethClient eth.InstrumentedClient
ethClientFallback eth.InstrumentedClient
logger logging.Logger
}

func NewAvsServiceBindings(serviceManagerAddr, blsOperatorStateRetrieverAddr gethcommon.Address, ethClient eth.Client, ethClientFallback eth.Client, logger logging.Logger) (*AvsServiceBindings, error) {
contractServiceManager, err := csservicemanager.NewContractAlignedLayerServiceManager(serviceManagerAddr, ethClient)
func NewAvsServiceBindings(serviceManagerAddr, blsOperatorStateRetrieverAddr gethcommon.Address, ethClient eth.InstrumentedClient, ethClientFallback eth.InstrumentedClient, logger logging.Logger) (*AvsServiceBindings, error) {
contractServiceManager, err := csservicemanager.NewContractAlignedLayerServiceManager(serviceManagerAddr, &ethClient)
if err != nil {
logger.Error("Failed to fetch AlignedLayerServiceManager contract", "err", err)
return nil, err
}

contractServiceManagerFallback, err := csservicemanager.NewContractAlignedLayerServiceManager(serviceManagerAddr, ethClientFallback)
contractServiceManagerFallback, err := csservicemanager.NewContractAlignedLayerServiceManager(serviceManagerAddr, &ethClientFallback)
if err != nil {
logger.Error("Failed to fetch AlignedLayerServiceManager contract", "err", err)
return nil, err
Expand Down
35 changes: 22 additions & 13 deletions core/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (

"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
rpccalls "github.com/Layr-Labs/eigensdk-go/metrics/collectors/rpc_calls"
sdkutils "github.com/Layr-Labs/eigensdk-go/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v2"
)

Expand All @@ -27,10 +29,10 @@ type BaseConfig struct {
Logger sdklogging.Logger
EthRpcUrl string
EthWsUrl string
EthRpcClient eth.Client
EthRpcClientFallback eth.Client
EthWsClient eth.Client
EthWsClientFallback eth.Client
EthRpcClient eth.InstrumentedClient
EthRpcClientFallback eth.InstrumentedClient
EthWsClient eth.InstrumentedClient
EthWsClientFallback eth.InstrumentedClient
EigenMetricsIpPortAddress string
ChainId *big.Int
}
Expand Down Expand Up @@ -96,12 +98,15 @@ func NewBaseConfig(configFilePath string) *BaseConfig {
log.Fatal("Eth ws url or fallback is empty")
}

ethWsClient, err := eth.NewClient(baseConfigFromYaml.EthWsUrl)
reg := prometheus.NewRegistry()
rpcCallsCollector := rpccalls.NewCollector("ethWs", reg)
ethWsClient, err := eth.NewInstrumentedClient(baseConfigFromYaml.EthWsUrl, rpcCallsCollector)
if err != nil {
log.Fatal("Error initializing eth ws client: ", err)
}

ethWsClientFallback, err := eth.NewClient(baseConfigFromYaml.EthWsUrlFallback)
reg = prometheus.NewRegistry()
rpcCallsCollector = rpccalls.NewCollector("ethWsFallback", reg)
ethWsClientFallback, err := eth.NewInstrumentedClient(baseConfigFromYaml.EthWsUrlFallback, rpcCallsCollector)
if err != nil {
log.Fatal("Error initializing eth ws client fallback: ", err)
}
Expand All @@ -110,12 +115,16 @@ func NewBaseConfig(configFilePath string) *BaseConfig {
log.Fatal("Eth rpc url is empty")
}

ethRpcClient, err := eth.NewClient(baseConfigFromYaml.EthRpcUrl)
reg = prometheus.NewRegistry()
rpcCallsCollector = rpccalls.NewCollector("ethRpc", reg)
ethRpcClient, err := eth.NewInstrumentedClient(baseConfigFromYaml.EthRpcUrl, rpcCallsCollector)
if err != nil {
log.Fatal("Error initializing eth rpc client: ", err)
}

ethRpcClientFallback, err := eth.NewClient(baseConfigFromYaml.EthRpcUrlFallback)
reg = prometheus.NewRegistry()
rpcCallsCollector = rpccalls.NewCollector("ethRpc", reg)
ethRpcClientFallback, err := eth.NewInstrumentedClient(baseConfigFromYaml.EthRpcUrlFallback, rpcCallsCollector)
if err != nil {
log.Fatal("Error initializing eth rpc client fallback: ", err)
}
Expand All @@ -136,10 +145,10 @@ func NewBaseConfig(configFilePath string) *BaseConfig {
Logger: logger,
EthRpcUrl: baseConfigFromYaml.EthRpcUrl,
EthWsUrl: baseConfigFromYaml.EthWsUrl,
EthRpcClient: ethRpcClient,
EthRpcClientFallback: ethRpcClientFallback,
EthWsClient: ethWsClient,
EthWsClientFallback: ethWsClientFallback,
EthRpcClient: *ethRpcClient,
EthRpcClientFallback: *ethRpcClientFallback,
EthWsClient: *ethWsClient,
EthWsClientFallback: *ethWsClientFallback,
EigenMetricsIpPortAddress: baseConfigFromYaml.EigenMetricsIpPortAddress,
ChainId: chainId,
}
Expand Down
2 changes: 1 addition & 1 deletion core/utils/eth_client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
const maxRetries = 25
const sleepTime = 5 * time.Second

func WaitForTransactionReceipt(client eth.Client, ctx context.Context, txHash gethcommon.Hash) (*types.Receipt, error) {
func WaitForTransactionReceipt(client eth.InstrumentedClient, ctx context.Context, txHash gethcommon.Hash) (*types.Receipt, error) {
for i := 0; i < maxRetries; i++ {
receipt, err := client.TransactionReceipt(ctx, txHash)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/yetanotherco/aligned_layer
go 1.22.2

require (
github.com/Layr-Labs/eigensdk-go v0.1.9
github.com/Layr-Labs/eigensdk-go v0.1.12
github.com/ethereum/go-ethereum v1.14.0
github.com/prometheus/client_golang v1.19.1
github.com/urfave/cli/v2 v2.27.1
Expand Down Expand Up @@ -91,5 +91,3 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)

replace github.com/Layr-Labs/eigensdk-go => github.com/yetanotherco/eigensdk-go v0.1.10-0.20240805154752-29f4d3457921
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Layr-Labs/eigensdk-go v0.1.12 h1:Drf59iJLvnTm2Om9AwAyUMiZeJaTI8ZameIrnhjopSY=
github.com/Layr-Labs/eigensdk-go v0.1.12/go.mod h1:XcLVDtlB1vOPj63D236b451+SC75B8gwgkpNhYHSxNs=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
Expand Down Expand Up @@ -315,8 +317,6 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/yetanotherco/eigensdk-go v0.1.10-0.20240805154752-29f4d3457921 h1:bYER70hS8+Qx//uWWhm0+WymLTKHiqfbJkphteKUxPc=
github.com/yetanotherco/eigensdk-go v0.1.10-0.20240805154752-29f4d3457921/go.mod h1:XcLVDtlB1vOPj63D236b451+SC75B8gwgkpNhYHSxNs=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
Expand Down
8 changes: 4 additions & 4 deletions operator/cmd/actions/deposit_into_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ func depositIntoStrategyMain(ctx *cli.Context) error {
if err != nil {
return err
}
w, err := wallet.NewPrivateKeyWallet(config.BaseConfig.EthRpcClient, signerFn,
w, err := wallet.NewPrivateKeyWallet(&config.BaseConfig.EthRpcClient, signerFn,
config.Operator.Address, config.BaseConfig.Logger)

if err != nil {
return err
}

txMgr := txmgr.NewSimpleTxManager(w, config.BaseConfig.EthRpcClient, config.BaseConfig.Logger,
txMgr := txmgr.NewSimpleTxManager(w, &config.BaseConfig.EthRpcClient, config.BaseConfig.Logger,
config.Operator.Address)
eigenMetrics := metrics.NewNoopMetrics()
eigenLayerWriter, err := elcontracts.BuildELChainWriter(delegationManagerAddr, avsDirectoryAddr,
config.BaseConfig.EthRpcClient, config.BaseConfig.Logger, eigenMetrics, txMgr)
&config.BaseConfig.EthRpcClient, config.BaseConfig.Logger, eigenMetrics, txMgr)
if err != nil {
return err
}

_, err = eigenLayerWriter.DepositERC20IntoStrategy(context.Background(), strategyAddr, amount)
_, err = eigenLayerWriter.DepositERC20IntoStrategy(context.Background(), strategyAddr, amount, true)
if err != nil {
config.BaseConfig.Logger.Errorf("Error depositing into strategy")
return err
Expand Down
9 changes: 3 additions & 6 deletions operator/pkg/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package operator

import (
"context"
"math/big"
"time"

"github.com/Layr-Labs/eigensdk-go/types"
"github.com/yetanotherco/aligned_layer/core/chainio"
Expand All @@ -25,14 +23,13 @@ func RegisterOperator(
return err
}

operatorToAvsRegistrationSigExpiry := big.NewInt(time.Now().Add(10 * time.Minute).Unix())
socket := "Not Needed"

quorumNumbers := types.QuorumNums{0}

_, err = writer.RegisterOperatorInQuorumWithAVSRegistryCoordinator(ctx, configuration.EcdsaConfig.PrivateKey,
operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry, configuration.BlsConfig.KeyPair,
quorumNumbers, socket)
_, err = writer.RegisterOperator(ctx, configuration.EcdsaConfig.PrivateKey,
configuration.BlsConfig.KeyPair,
quorumNumbers, socket, true)

if err != nil {
configuration.BaseConfig.Logger.Error("Failed to register operator", "err", err)
Expand Down

0 comments on commit 6b1a4ae

Please sign in to comment.