Skip to content

Commit

Permalink
Merge branch 'staging' into 1082-add-a-bitmap-in-ethereum-to-disable-…
Browse files Browse the repository at this point in the history
…verifiers
  • Loading branch information
IAvecilla committed Oct 1, 2024
2 parents 88fc2e0 + decd1c7 commit ee0d97d
Show file tree
Hide file tree
Showing 19 changed files with 299 additions and 190 deletions.
16 changes: 16 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ update_operator:
@make build_operator
@./operator/build/aligned-operator --version

operator_valid_marshall_fuzz_macos:
@cd operator/pkg && go test -fuzz=FuzzValidMarshall -ldflags=-extldflags=-Wl,-ld_classic

operator_valid_marshall_fuzz_linux:
@cd operator/pkg && \
LD_LIBRARY_PATH=$(LD_LIBRARY_PATH):$(CURDIR)/operator/risc_zero/lib \
go test -fuzz=FuzzValidMarshall

operator_marshall_unmarshall_fuzz_macos:
@cd operator/pkg && go test -fuzz=FuzzMarshalUnmarshal -ldflags=-extldflags=-Wl,-ld_classic

operator_marshall_unmarshall_fuzz_linux:
@cd operator/pkg && \
LD_LIBRARY_PATH=$(LD_LIBRARY_PATH):$(CURDIR)/operator/risc_zero/lib \
go test -fuzz=FuzzMarshalUnmarshal

bindings:
cd contracts && ./generate-go-bindings.sh

Expand Down
20 changes: 1 addition & 19 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ type Aggregator struct {
// Stores the TaskResponse for each batch by batchIdentifierHash
batchDataByIdentifierHash map[[32]byte]BatchData

// Stores if an operator already submitted a response for a batch
// This is to avoid double submissions
// struct{} is used as a placeholder because it is the smallest type
// go does not have a set type
operatorRespondedBatch map[uint32]map[eigentypes.Bytes32]struct{}

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
Expand Down Expand Up @@ -146,7 +140,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
return taskResponseDigest, nil
}

operatorPubkeysService := oppubkeysserv.NewOperatorsInfoServiceInMemory(context.Background(), clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, nil, logger)
operatorPubkeysService := oppubkeysserv.NewOperatorsInfoServiceInMemory(context.Background(), clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, nil, oppubkeysserv.Opts{}, logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader.ChainReader, operatorPubkeysService, logger)
blsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, hashFunction, logger)

Expand All @@ -167,7 +161,6 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
batchDataByIdentifierHash: batchDataByIdentifierHash,
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
operatorRespondedBatch: make(map[uint32]map[eigentypes.Bytes32]struct{}),
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},
walletMutex: &sync.Mutex{},
Expand Down Expand Up @@ -220,12 +213,6 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.taskMutex.Lock()
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
agg.logger.Error("BlsAggregationServiceResponse contains an error", "err", blsAggServiceResp.Err, "batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
agg.logger.Info("- Locking task mutex: Delete task from operator map", "taskIndex", blsAggServiceResp.TaskIndex)

// Remove task from the list of tasks
delete(agg.operatorRespondedBatch, blsAggServiceResp.TaskIndex)

agg.logger.Info("- Unlocking task mutex: Delete task from operator map", "taskIndex", blsAggServiceResp.TaskIndex)
agg.taskMutex.Unlock()
return
}
Expand Down Expand Up @@ -254,10 +241,6 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]

// Delete the task from the map
delete(agg.operatorRespondedBatch, blsAggServiceResp.TaskIndex)

agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching merkle root")
agg.taskMutex.Unlock()

Expand All @@ -282,7 +265,6 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

Expand Down
26 changes: 1 addition & 25 deletions aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"net/rpc"
"time"

eigentypes "github.com/Layr-Labs/eigensdk-go/types"

"github.com/yetanotherco/aligned_layer/core/types"
)

Expand Down Expand Up @@ -74,26 +72,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
return nil
}

// Note: we already have lock here
agg.logger.Debug("- Checking if operator already responded")
batchResponses, ok := agg.operatorRespondedBatch[taskIndex]
if !ok {
batchResponses = make(map[eigentypes.Bytes32]struct{})
agg.operatorRespondedBatch[taskIndex] = batchResponses
}

if _, ok := batchResponses[signedTaskResponse.OperatorId]; ok {
*reply = 0
agg.logger.Warn("Operator already responded, ignoring",
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]),
"taskIndex", taskIndex, "batchMerkleRoot", hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]))

agg.taskMutex.Unlock()
return nil
}

batchResponses[signedTaskResponse.OperatorId] = struct{}{}

// Don't wait infinitely if it can't answer
// Create a context with a timeout of 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand All @@ -111,9 +89,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t

if err != nil {
agg.logger.Warnf("BLS aggregation service error: %s", err)
// remove operator from the list of operators that responded
// so that it can try again
delete(batchResponses, signedTaskResponse.OperatorId)
// todo shouldn't we here close the channel with a reply = 1?
} else {
agg.logger.Info("BLS process succeeded")
}
Expand Down
28 changes: 28 additions & 0 deletions common/proving_systems.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"log"

"github.com/fxamacker/cbor/v2"
)

type ProvingSystemId uint16
Expand Down Expand Up @@ -96,6 +98,32 @@ func (t *ProvingSystemId) UnmarshalBinary(data []byte) error {
return err
}

func (s *ProvingSystemId) UnmarshalCBOR(data []byte) error {
var statusStr string
if err := cbor.Unmarshal(data, &statusStr); err != nil {
return err
}

switch statusStr {
case "GnarkPlonkBls12_381":
*s = GnarkPlonkBls12_381
case "GnarkPlonkBn254":
*s = GnarkPlonkBn254
case "Groth16Bn254":
*s = Groth16Bn254
case "SP1":
*s = SP1
case "Halo2KZG":
*s = Halo2KZG
case "Halo2IPA":
*s = Halo2IPA
case "Risc0":
*s = Risc0
}

return nil
}

func (t ProvingSystemId) MarshalBinary() ([]byte, error) {
// needs to be defined but should never be called
return nil, fmt.Errorf("not implemented")
Expand Down
4 changes: 2 additions & 2 deletions core/chainio/avs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func NewAvsReaderFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
}

func (r *AvsReader) GetErc20Mock(tokenAddr gethcommon.Address) (*contractERC20Mock.ContractERC20Mock, error) {
erc20Mock, err := contractERC20Mock.NewContractERC20Mock(tokenAddr, r.AvsContractBindings.ethClient)
erc20Mock, err := contractERC20Mock.NewContractERC20Mock(tokenAddr, &r.AvsContractBindings.ethClient)
if err != nil {
// Retry with fallback client
erc20Mock, err = contractERC20Mock.NewContractERC20Mock(tokenAddr, r.AvsContractBindings.ethClientFallback)
erc20Mock, err = contractERC20Mock.NewContractERC20Mock(tokenAddr, &r.AvsContractBindings.ethClientFallback)
if err != nil {
r.logger.Error("Failed to fetch ERC20Mock contract", "err", err)
}
Expand Down
5 changes: 2 additions & 3 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type AvsWriter struct {
AvsContractBindings *AvsServiceBindings
logger logging.Logger
Signer signer.Signer
Client eth.Client
ClientFallback eth.Client
Client eth.InstrumentedClient
ClientFallback eth.InstrumentedClient
}

func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.EcdsaConfig) (*AvsWriter, error) {
Expand Down Expand Up @@ -119,7 +119,6 @@ func (w *AvsWriter) checkRespondToTaskFeeLimit(tx *types.Transaction, txOpts bin
// Proceed to check values against simulated costs
w.logger.Error("Failed to get batch state", "error", err)
w.logger.Info("Proceeding with simulated cost checks")

return w.compareBalances(simulatedCost, aggregatorAddress, senderAddress)
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/chainio/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ import (
type AvsServiceBindings struct {
ServiceManager *csservicemanager.ContractAlignedLayerServiceManager
ServiceManagerFallback *csservicemanager.ContractAlignedLayerServiceManager
ethClient eth.Client
ethClientFallback eth.Client
ethClient eth.InstrumentedClient
ethClientFallback eth.InstrumentedClient
logger logging.Logger
}

func NewAvsServiceBindings(serviceManagerAddr, blsOperatorStateRetrieverAddr gethcommon.Address, ethClient eth.Client, ethClientFallback eth.Client, logger logging.Logger) (*AvsServiceBindings, error) {
contractServiceManager, err := csservicemanager.NewContractAlignedLayerServiceManager(serviceManagerAddr, ethClient)
func NewAvsServiceBindings(serviceManagerAddr, blsOperatorStateRetrieverAddr gethcommon.Address, ethClient eth.InstrumentedClient, ethClientFallback eth.InstrumentedClient, logger logging.Logger) (*AvsServiceBindings, error) {
contractServiceManager, err := csservicemanager.NewContractAlignedLayerServiceManager(serviceManagerAddr, &ethClient)
if err != nil {
logger.Error("Failed to fetch AlignedLayerServiceManager contract", "err", err)
return nil, err
}

contractServiceManagerFallback, err := csservicemanager.NewContractAlignedLayerServiceManager(serviceManagerAddr, ethClientFallback)
contractServiceManagerFallback, err := csservicemanager.NewContractAlignedLayerServiceManager(serviceManagerAddr, &ethClientFallback)
if err != nil {
logger.Error("Failed to fetch AlignedLayerServiceManager contract", "err", err)
return nil, err
Expand Down
35 changes: 22 additions & 13 deletions core/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (

"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
rpccalls "github.com/Layr-Labs/eigensdk-go/metrics/collectors/rpc_calls"
sdkutils "github.com/Layr-Labs/eigensdk-go/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v2"
)

Expand All @@ -27,10 +29,10 @@ type BaseConfig struct {
Logger sdklogging.Logger
EthRpcUrl string
EthWsUrl string
EthRpcClient eth.Client
EthRpcClientFallback eth.Client
EthWsClient eth.Client
EthWsClientFallback eth.Client
EthRpcClient eth.InstrumentedClient
EthRpcClientFallback eth.InstrumentedClient
EthWsClient eth.InstrumentedClient
EthWsClientFallback eth.InstrumentedClient
EigenMetricsIpPortAddress string
ChainId *big.Int
}
Expand Down Expand Up @@ -96,12 +98,15 @@ func NewBaseConfig(configFilePath string) *BaseConfig {
log.Fatal("Eth ws url or fallback is empty")
}

ethWsClient, err := eth.NewClient(baseConfigFromYaml.EthWsUrl)
reg := prometheus.NewRegistry()
rpcCallsCollector := rpccalls.NewCollector("ethWs", reg)
ethWsClient, err := eth.NewInstrumentedClient(baseConfigFromYaml.EthWsUrl, rpcCallsCollector)
if err != nil {
log.Fatal("Error initializing eth ws client: ", err)
}

ethWsClientFallback, err := eth.NewClient(baseConfigFromYaml.EthWsUrlFallback)
reg = prometheus.NewRegistry()
rpcCallsCollector = rpccalls.NewCollector("ethWsFallback", reg)
ethWsClientFallback, err := eth.NewInstrumentedClient(baseConfigFromYaml.EthWsUrlFallback, rpcCallsCollector)
if err != nil {
log.Fatal("Error initializing eth ws client fallback: ", err)
}
Expand All @@ -110,12 +115,16 @@ func NewBaseConfig(configFilePath string) *BaseConfig {
log.Fatal("Eth rpc url is empty")
}

ethRpcClient, err := eth.NewClient(baseConfigFromYaml.EthRpcUrl)
reg = prometheus.NewRegistry()
rpcCallsCollector = rpccalls.NewCollector("ethRpc", reg)
ethRpcClient, err := eth.NewInstrumentedClient(baseConfigFromYaml.EthRpcUrl, rpcCallsCollector)
if err != nil {
log.Fatal("Error initializing eth rpc client: ", err)
}

ethRpcClientFallback, err := eth.NewClient(baseConfigFromYaml.EthRpcUrlFallback)
reg = prometheus.NewRegistry()
rpcCallsCollector = rpccalls.NewCollector("ethRpc", reg)
ethRpcClientFallback, err := eth.NewInstrumentedClient(baseConfigFromYaml.EthRpcUrlFallback, rpcCallsCollector)
if err != nil {
log.Fatal("Error initializing eth rpc client fallback: ", err)
}
Expand All @@ -136,10 +145,10 @@ func NewBaseConfig(configFilePath string) *BaseConfig {
Logger: logger,
EthRpcUrl: baseConfigFromYaml.EthRpcUrl,
EthWsUrl: baseConfigFromYaml.EthWsUrl,
EthRpcClient: ethRpcClient,
EthRpcClientFallback: ethRpcClientFallback,
EthWsClient: ethWsClient,
EthWsClientFallback: ethWsClientFallback,
EthRpcClient: *ethRpcClient,
EthRpcClientFallback: *ethRpcClientFallback,
EthWsClient: *ethWsClient,
EthWsClientFallback: *ethWsClientFallback,
EigenMetricsIpPortAddress: baseConfigFromYaml.EigenMetricsIpPortAddress,
ChainId: chainId,
}
Expand Down
2 changes: 1 addition & 1 deletion core/utils/eth_client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
const maxRetries = 25
const sleepTime = 5 * time.Second

func WaitForTransactionReceipt(client eth.Client, ctx context.Context, txHash gethcommon.Hash) (*types.Receipt, error) {
func WaitForTransactionReceipt(client eth.InstrumentedClient, ctx context.Context, txHash gethcommon.Hash) (*types.Receipt, error) {
for i := 0; i < maxRetries; i++ {
receipt, err := client.TransactionReceipt(ctx, txHash)
if err != nil {
Expand Down
Loading

0 comments on commit ee0d97d

Please sign in to comment.