Skip to content

Commit

Permalink
Merge branch 'protocolResponseHandling' into 'main'
Browse files Browse the repository at this point in the history
Protocol response handling

See merge request flarenetwork/flare-system-client!54
  • Loading branch information
GrePod committed Dec 17, 2024
2 parents 90498b9 + 8a2ba2f commit b7deb36
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 138 deletions.
12 changes: 6 additions & 6 deletions client/epoch/epoch_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ func (c *client) signRewards(epochId *big.Int) {
return nil, nil
}

logger.Info("Signing rewards for epoch %v, attempt %d", epochId, i)
logger.Infof("Signing rewards for epoch %v, attempt %d", epochId, i)

data, err := fetchRewardData(epochId, c.rewardsConfig)
if data == nil {
return nil, errors.New("no reward data found")
}
if err != nil {
return nil, errors.Wrapf(err, "unable to fetch reward data for epoch %d", epochId)
}
if data == nil {
return nil, errors.New("no reward data found")
}
hash, weightClaims, err := verifyRewardData(epochId, c.identityAddress, data, c.rewardsConfig)
if err != nil {
return nil, errors.Wrapf(err, "reward data verification for epoch %d failed", epochId)
Expand All @@ -262,9 +262,9 @@ func (c *client) signRewards(epochId *big.Int) {
go func() {
status := <-res
if status.Success {
logger.Info("Signing rewards for epoch %v completed", epochId)
logger.Infof("Signing rewards for epoch %v completed", epochId)
} else {
logger.Info("Signing rewards for epoch %v failed: %s", epochId, status.Message)
logger.Infof("Signing rewards for epoch %v failed: %s", epochId, status.Message)
}
}()
}
34 changes: 21 additions & 13 deletions client/epoch/rewards_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package epoch
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"math/big"
"net/http"
"net/url"
"time"

"github.com/flare-foundation/flare-system-client/client/config"
Expand All @@ -23,6 +23,9 @@ import (
"github.com/flare-foundation/go-flare-common/pkg/merkle"
)

const timeout = 5 * time.Second // maximal duration for fetching of the reward data
const maxRespSize = 100 * (1 << 20) // 100 MB for maximal response size of the reward

var (
uint8Type, _ = abi.NewType("uint8", "", nil)
uint64Type, _ = abi.NewType("uint64", "", nil)
Expand Down Expand Up @@ -121,11 +124,16 @@ func fetchRewardData(epochId *big.Int, config *config.RewardsConfig) (*rewardDis
return nil, errors.New("reward data url prefix not set")
}

url := fmt.Sprintf("%s/%d/reward-distribution-data.json", config.UrlPrefix, epochId)
rewardsUrl, err := url.JoinPath(config.UrlPrefix, epochId.Text(10), "reward-distribution-data.json")
if err != nil {
return nil, errors.Errorf("cannot join url: %s", err)
}

logger.Infof("Fetching reward data at: %s", rewardsUrl)
result := <-shared.ExecuteWithRetryChan(func() (*rewardDistributionData, error) {
client := &http.Client{Timeout: timeout}

logger.Info("Fetching reward data at: %s", url)
result := <-shared.ExecuteWithRetryChan(func() ([]byte, error) {
resp, err := http.Get(url)
resp, err := client.Get(rewardsUrl)
if err != nil {
return nil, err
}
Expand All @@ -138,23 +146,23 @@ func fetchRewardData(epochId *big.Int, config *config.RewardsConfig) (*rewardDis
return nil, errors.Errorf("unexpected status code: %s", resp.Status)
}

bytes, err := io.ReadAll(resp.Body)
respLimited := &io.LimitedReader{R: resp.Body, N: maxRespSize}

decoder := json.NewDecoder(respLimited)

var rewardData rewardDistributionData
err = decoder.Decode(&rewardData)
if err != nil {
return nil, err
}
return bytes, nil
return &rewardData, nil
}, 3, 1*time.Second)

if !result.Success {
return nil, errors.Errorf("unable to fetch reward data")
}

var rewardData rewardDistributionData
err := json.Unmarshal(result.Value, &rewardData)
if err != nil {
return nil, errors.Wrap(err, "unable to parse reward data")
}
return &rewardData, nil
return result.Value, nil
}

func verifyRewardData(epochId *big.Int, identity common.Address, data *rewardDistributionData, rewardsConfig *config.RewardsConfig) (*common.Hash, int, error) {
Expand Down
12 changes: 6 additions & 6 deletions client/epoch/system_manager_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *systemsManagerContractClientImpl) sendSignNewSigningPolicy(rewardEpochI
)
if err != nil {
if shared.ExistsAsSubstring(nonFatalSignNewSigningPolicyErrors, err.Error()) {
logger.Infof("Non fatal error dry run sign new signing policy: %v", err)
logger.Debugf("Non fatal error dry run sign new signing policy: %v", err)
return nil
}
logger.Warnf("Dry run fail: %v", err)
Expand All @@ -138,7 +138,7 @@ func (s *systemsManagerContractClientImpl) sendSignNewSigningPolicy(rewardEpochI
tx, err := s.flareSystemsManager.SignNewSigningPolicy(s.senderTxOpts, rewardEpochId, [32]byte(newSigningPolicyHash), signature)
if err != nil {
if shared.ExistsAsSubstring(nonFatalSignNewSigningPolicyErrors, err.Error()) {
logger.Infof("Non fatal error sending sign new signing policy: %v", err)
logger.Debugf("Non fatal error sending sign new signing policy: %v", err)
return nil
}
return err
Expand Down Expand Up @@ -293,7 +293,7 @@ func (s *systemsManagerContractClientImpl) sendSignUptimeVote(rewardEpochId *big
)
if err != nil {
if shared.ExistsAsSubstring(nonFatalSignUptimeVoteErrors, err.Error()) {
logger.Infof("Non fatal error dryRun sign uptime vote: %v", err)
logger.Debugf("Non fatal error dryRun sign uptime vote: %v", err)
return nil
}
logger.Warnf("Dry run fail: %v", err)
Expand All @@ -304,7 +304,7 @@ func (s *systemsManagerContractClientImpl) sendSignUptimeVote(rewardEpochId *big
tx, err := s.flareSystemsManager.SignUptimeVote(s.senderTxOpts, rewardEpochId, hash, *signature)
if err != nil {
if shared.ExistsAsSubstring(nonFatalSignUptimeVoteErrors, err.Error()) {
logger.Infof("Non fatal error sending sign uptime vote: %v", err)
logger.Debugf("Non fatal error sending sign uptime vote: %v", err)
return nil
}
return err
Expand Down Expand Up @@ -418,7 +418,7 @@ func (s *systemsManagerContractClientImpl) sendSignRewards(epochId *big.Int, rew
)
if err != nil {
if shared.ExistsAsSubstring(nonFatalSignRewardsErrors, err.Error()) {
logger.Infof("Non fatal error dry run reward signature: %v", err)
logger.Debugf("Non fatal error dry run reward signature: %v", err)
return nil
}
logger.Warnf("Dry run fail: %v", err)
Expand All @@ -429,7 +429,7 @@ func (s *systemsManagerContractClientImpl) sendSignRewards(epochId *big.Int, rew
tx, err := s.flareSystemsManager.SignRewards(s.senderTxOpts, epochId, numberOfWeightBasedClaims, *rewardHash, signature)
if err != nil {
if shared.ExistsAsSubstring(nonFatalSignRewardsErrors, err.Error()) {
logger.Infof("Non fatal error sending reward signature: %v", err)
logger.Debugf("Non fatal error sending reward signature: %v", err)
return nil
}
return err
Expand Down
2 changes: 1 addition & 1 deletion client/finalizer/finalization_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (fs *finalizationStorage) RemoveRoundsBefore(votingRoundID uint32) {
defer fs.Unlock()

for i := fs.lowestRoundStored; i < votingRoundID; i++ {
logger.Infof("Deleting round %d in finalization storage", i)
logger.Debugf("Deleting round %d in finalization storage", i)
delete(fs.stg, i)
}

Expand Down
12 changes: 5 additions & 7 deletions client/finalizer/finalizer_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ func (p *finalizerQueueProcessor) Run(ctx context.Context) error {
}

if p.isVoterForCurrentEpoch(item) {
logger.Infof("Finalizer with address %v was selected for item %v", p.relayClient.senderAddress, item)
logger.Infof("Finalizer with address %v was selected for voting round %v for protocol %v", p.relayClient.senderAddress, item.votingRoundID, item.protocolID)

p.processItem(ctx, item, false)
} else {
logger.Infof("Finalizer with address %v will send outside grace period for item %v", p.relayClient.senderAddress, item)
logger.Infof("Finalizer with address %v will send outside grace period for voting round %v for protocol %v", p.relayClient.senderAddress, item.votingRoundID, item.protocolID)

_, exists := p.finalizationStorage.Get(item.votingRoundID, item.protocolID, item.msgHash)
if exists {
Expand All @@ -144,11 +144,9 @@ func (p *finalizerQueueProcessor) Run(ctx context.Context) error {
st := votingRoundStartTime.Add(p.finalizerContext.gracePeriodEndOffset)

if st.Before(time.Now()) {
logger.Infof("Finalizer will send item %v now", item)
logger.Debugf("Finalizer will send now for voting round %v for protocol %v", item.votingRoundID, item.protocolID)
p.processItem(ctx, item, true)
}

logger.Infof("Finalizer will send item %v at %v", item, st)
p.delayedQueues.Add(st, item)
} else {
logger.Errorf("Finalizer missing finalization data for protocol %v in votingRound %v", item.protocolID, item.votingRoundID)
Expand Down Expand Up @@ -200,7 +198,7 @@ func (p *finalizerQueueProcessor) processItem(ctx context.Context, item *queueIt
}

logger.Infof("Relaying for round %d for protocol %d", item.votingRoundID, item.protocolID)
p.relayClient.SubmitPayloads(ctx, txInput, isDelayed)
p.relayClient.SubmitPayloads(ctx, txInput, isDelayed, item.protocolID)
}

func (p *finalizerQueueProcessor) processDelayedQueue(items []*queueItem) error {
Expand All @@ -217,7 +215,7 @@ func (p *finalizerQueueProcessor) processDelayedQueue(items []*queueItem) error
if relayedItems[*item] {
continue
}
logger.Infof("Finalizer processes delayed queue item %v", item)
logger.Infof("Finalizer processes delayed queue item for round %v for protocol %v", item.votingRoundID, item.protocolID)
p.processItem(context.TODO(), item, true)
}
return nil
Expand Down
16 changes: 10 additions & 6 deletions client/finalizer/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,27 +143,31 @@ func (r *relayContractClient) SigningPolicyInitializedListener(db finalizerDB, s
}

// SubmitPayloads sends a transaction with input to Relay contract.
func (r *relayContractClient) SubmitPayloads(ctx context.Context, input []byte, dryRun bool) {
func (r *relayContractClient) SubmitPayloads(ctx context.Context, input []byte, dryRun bool, protocolID uint8) {
if len(input) == 0 {
return
}

execStatusChan := shared.ExecuteWithRetryChan(func() (any, error) {
execStatusChan := shared.ExecuteWithRetryChan(func() (string, error) {
err := r.chainClient.SendRawTx(r.privateKey, r.address, input, r.gasConfig, chain.DefaultTxTimeout)
if err != nil {
if shared.ExistsAsSubstring(nonFatalRelayErrors, err.Error()) {
logger.Infof("Non fatal error sending relay tx: %v", err)
logger.Debugf("Non fatal error sending relay tx for protocol %d: %v", protocolID, err)
return "non fatal error", nil

} else {
return nil, errors.Wrap(err, "Error sending relay tx")
return "", errors.Wrap(err, "Error sending relay tx")
}
}
return nil, nil
return "success", nil
}, shared.MaxTxSendRetries, shared.TxRetryInterval)

select {
case execStatus := <-execStatusChan:
if execStatus.Success {
logger.Info("Relaying finished")
logger.Infof("Relaying finished for protocol %d with %s", protocolID, execStatus.Value)
} else {
logger.Warnf("Relaying failed with: %v", execStatus.Message)
}

case <-ctx.Done():
Expand Down
5 changes: 3 additions & 2 deletions client/protocol/protocol_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/flare-foundation/go-flare-common/pkg/logger"
"github.com/flare-foundation/go-flare-common/pkg/payload"
)

const (
Expand Down Expand Up @@ -275,8 +276,8 @@ func (ep *testAPIEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

rsp := dataProviderResponse{
Status: "OK",
rsp := payload.SubprotocolResponse{
Status: payload.Ok,
Data: "0x" + strings.Repeat("ff", 38),
AdditionalData: "0x1234",
}
Expand Down
Loading

0 comments on commit b7deb36

Please sign in to comment.