Skip to content

Commit

Permalink
Merge branch 'staging' into 1415-send-signed-task-response-to-aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
PatStiles authored Nov 21, 2024
2 parents 2a6d5fb + 439a6a0 commit 7d6abf7
Show file tree
Hide file tree
Showing 24 changed files with 1,037 additions and 235 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ OS := $(shell uname -s)
CONFIG_FILE?=config-files/config.yaml
AGG_CONFIG_FILE?=config-files/config-aggregator.yaml

OPERATOR_VERSION=v0.10.3
OPERATOR_VERSION=v0.11.2

ifeq ($(OS),Linux)
BUILD_ALL_FFI = $(MAKE) build_all_ffi_linux
Expand Down
25 changes: 18 additions & 7 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ func (agg *Aggregator) Start(ctx context.Context) error {
const MaxSentTxRetries = 5

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
defer func() {
err := recover() //stops panics
if err != nil {
agg.logger.Error("handleBlsAggServiceResponse recovered from panic", "err", err)
}
}()

agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
Expand Down Expand Up @@ -277,10 +284,15 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
}

agg.logger.Info("Sending aggregated response onchain", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]), "merkleRoot", "0x"+hex.EncodeToString(batchData.BatchMerkleRoot[:]))
receipt, err := agg.sendAggregatedResponse(batchIdentifierHash, batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
if err == nil {
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, receipt.TxHash.String())
// In some cases, we may fail to retrieve the receipt for the transaction.
txHash := "Unknown"
if receipt != nil {
txHash = receipt.TxHash.String()
}
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash)
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
Expand Down Expand Up @@ -384,8 +396,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, agg.AggregatorConfig.Aggregator.BlsServiceTaskTimeout)
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
err := agg.InitializeNewTaskRetryable(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, agg.AggregatorConfig.Aggregator.BlsServiceTaskTimeout)
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
}
Expand All @@ -399,7 +410,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
// |---RETRYABLE---|

/*
InitializeNewTask
InitializeNewTaskRetryable
Initialize a new task in the BLS Aggregation service
- Errors:
Permanent:
Expand All @@ -408,7 +419,7 @@ Initialize a new task in the BLS Aggregation service
- All others.
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
*/
func (agg *Aggregator) InitializeNewTask(batchIndex uint32, taskCreatedBlock uint32, quorumNums eigentypes.QuorumNums, quorumThresholdPercentages eigentypes.QuorumThresholdPercentages, timeToExpiry time.Duration) error {
func (agg *Aggregator) InitializeNewTaskRetryable(batchIndex uint32, taskCreatedBlock uint32, quorumNums eigentypes.QuorumNums, quorumThresholdPercentages eigentypes.QuorumThresholdPercentages, timeToExpiry time.Duration) error {
initializeNewTask_func := func() error {
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, timeToExpiry)
if err != nil {
Expand All @@ -429,7 +440,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
defer func() {
err := recover() //stops panics
if err != nil {
agg.logger.Error("Recovered from panic", "err", err)
agg.logger.Error("ClearTasksFromMaps Recovered from panic", "err", err)
}
}()

Expand Down
48 changes: 3 additions & 45 deletions aggregator/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ import (
"fmt"
"net/http"
"net/rpc"
"strings"
"time"

"github.com/Layr-Labs/eigensdk-go/crypto/bls"
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
retry "github.com/yetanotherco/aligned_layer/core"
"github.com/yetanotherco/aligned_layer/core/types"
)
Expand Down Expand Up @@ -72,7 +69,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t

agg.logger.Info("Starting bls signature process")
go func() {
err := agg.ProcessNewSignatureRetryable(
err := agg.blsAggregationService.ProcessNewSignature(
context.Background(), taskIndex, signedTaskResponse.BatchIdentifierHash,
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
)
Expand All @@ -99,9 +96,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
*reply = 0
}

agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished")
agg.taskMutex.Unlock()

return nil
}

Expand All @@ -112,48 +106,12 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error {
return nil
}

// |---RETRYABLE---|

// TODO: Add Retryable
/*
- Errors:
Permanent:
- SignatureVerificationError: Verification of the sigature within the BLS Aggregation Service failed. (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L42).
Transient:
- All others.
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
- NOTE: TaskNotFound errors from the BLS Aggregation service are Transient errors as block reorg's may lead to these errors being thrown.
*/
func (agg *Aggregator) ProcessNewSignatureRetryable(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error {
processNewSignature_func := func() error {
err := agg.blsAggregationService.ProcessNewSignature(
ctx, taskIndex, taskResponse,
blsSignature, operatorId,
)
if err != nil {
if strings.Contains(err.Error(), "Failed to verify signature") {
err = retry.PermanentError{Inner: err}
}
}
return err
}

return retry.Retry(processNewSignature_func, retry.ChainRetryConfig())
}

// Checks Internal mapping for Signed Task Response, returns its TaskIndex.
/*
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
*/
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) {
func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) {
getTaskIndex_func := func() (uint32, error) {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash]
agg.taskMutex.Unlock()
if !ok {
agg.taskMutex.Unlock()
agg.logger.Info("- Unlocked Resources: Task not found in the internal map")
return taskIndex, fmt.Errorf("Task not found in the internal map")
} else {
return taskIndex, nil
Expand Down
2 changes: 1 addition & 1 deletion batcher/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7d6abf7

Please sign in to comment.