Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: better handle reset of vote metrics #68

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions pkg/watcher/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ func (w *BlockWatcher) OnNodeStart(ctx context.Context, node *rpc.Node) error {

blockResp, err := node.Client.Block(ctx, nil)
if err != nil {
log.Warn().Err(err).Msg("failed to get latest block")
log.Warn().Err(err).
Str("node", node.Redacted()).
Msg("failed to get latest block")
} else {
w.handleNodeBlock(ctx, node, blockResp.Block)
w.handleNodeBlock(node, blockResp.Block)
}

// Ticker to sync validator set
Expand All @@ -70,7 +72,7 @@ func (w *BlockWatcher) OnNodeStart(ctx context.Context, node *rpc.Node) error {
return
case <-ticker.C:
if err := w.syncValidatorSet(ctx, node); err != nil {
log.Error().Err(err).Msg("failed to sync validator set")
log.Error().Err(err).Str("node", node.Redacted()).Msg("failed to sync validator set")
}
}
}
Expand All @@ -88,7 +90,7 @@ func (w *BlockWatcher) OnNewBlock(ctx context.Context, node *rpc.Node, evt *ctyp
blockEvent := evt.Data.(types.EventDataNewBlock)
block := blockEvent.Block

w.handleNodeBlock(ctx, node, block)
w.handleNodeBlock(node, block)

return nil
}
Expand All @@ -104,7 +106,7 @@ func (w *BlockWatcher) OnValidatorSetUpdates(ctx context.Context, node *rpc.Node
return nil
}

func (w *BlockWatcher) handleNodeBlock(ctx context.Context, node *rpc.Node, block *types.Block) {
func (w *BlockWatcher) handleNodeBlock(node *rpc.Node, block *types.Block) {
validatorSet := w.getValidatorSet()

if len(validatorSet) != block.LastCommit.Size() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/watcher/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func (w *UpgradeWatcher) Start(ctx context.Context) error {
if node == nil {
log.Warn().Msg("no node available to fetch upgrade plan")
} else if err := w.fetchUpgrade(ctx, node); err != nil {
log.Error().Err(err).Msg("failed to fetch upgrade plan")
log.Error().Err(err).
Str("node", node.Redacted()).
Msg("failed to fetch upgrade plan")
}

select {
Expand Down
4 changes: 3 additions & 1 deletion pkg/watcher/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ func (w *ValidatorsWatcher) Start(ctx context.Context) error {
if node == nil {
log.Warn().Msg("no node available to fetch validators")
} else if err := w.fetchValidators(ctx, node); err != nil {
log.Error().Err(err).Msg("failed to fetch staking validators")
log.Error().Err(err).
Str("node", node.Redacted()).
Msg("failed to fetch staking validators")
}

select {
Expand Down
84 changes: 55 additions & 29 deletions pkg/watcher/votes.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func (w *VotesWatcher) Start(ctx context.Context) error {
if node == nil {
log.Warn().Msg("no node available to fetch proposals")
} else if err := w.fetchProposals(ctx, node); err != nil {
log.Error().Err(err).Msg("failed to fetch pending proposals")
log.Error().Err(err).
Str("node", node.Redacted()).
Msg("failed to fetch pending proposals")
}

select {
Expand All @@ -55,17 +57,37 @@ func (w *VotesWatcher) Start(ctx context.Context) error {
}

func (w *VotesWatcher) fetchProposals(ctx context.Context, node *rpc.Node) error {
w.metrics.Vote.Reset()
var (
votes map[uint64]map[TrackedValidator]bool
err error
)

switch w.options.GovModuleVersion {
case "v1beta1":
return w.fetchProposalsV1Beta1(ctx, node)
votes, err = w.fetchProposalsV1Beta1(ctx, node)
default: // v1
return w.fetchProposalsV1(ctx, node)
votes, err = w.fetchProposalsV1(ctx, node)
}

if err != nil {
return err
}

w.metrics.Vote.Reset()
for proposalId, votes := range votes {
for validator, voted := range votes {
w.metrics.Vote.
WithLabelValues(node.ChainID(), validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)).
Set(metrics.BoolToFloat64(voted))
}
}

return nil
}

func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) error {
func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) (map[uint64]map[TrackedValidator]bool, error) {
votes := make(map[uint64]map[TrackedValidator]bool)

clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := gov.NewQueryClient(clientCtx)

Expand All @@ -74,13 +96,14 @@ func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) err
ProposalStatus: gov.StatusVotingPeriod,
})
if err != nil {
return fmt.Errorf("failed to get proposals: %w", err)
return votes, fmt.Errorf("failed to fetch proposals in voting period: %w", err)
}

chainID := node.ChainID()

// For each proposal, fetch validators vote
for _, proposal := range proposalsResp.GetProposals() {
votes[proposal.Id] = make(map[TrackedValidator]bool)
w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.Id)).Set(float64(proposal.VotingEndTime.Unix()))

for _, validator := range w.validators {
Expand All @@ -95,34 +118,29 @@ func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) err
})

if isInvalidArgumentError(err) {
w.handleVoteV1(chainID, validator, proposal.Id, nil)
votes[proposal.Id][validator] = false
} else if err != nil {
return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.Id, err)
return votes, fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.Id, err)
} else {
vote := voteResp.GetVote()
w.handleVoteV1(chainID, validator, proposal.Id, vote.Options)
voted := false
for _, option := range vote.Options {
if option.Option != gov.OptionEmpty {
voted = true
break
}
}
votes[proposal.Id][validator] = voted
}
}
}

return nil
return votes, nil
}

func (w *VotesWatcher) handleVoteV1(chainID string, validator TrackedValidator, proposalId uint64, votes []*gov.WeightedVoteOption) {
voted := false
for _, option := range votes {
if option.Option != gov.OptionEmpty {
voted = true
break
}
}
func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node) (map[uint64]map[TrackedValidator]bool, error) {
votes := make(map[uint64]map[TrackedValidator]bool)

w.metrics.Vote.
WithLabelValues(chainID, validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)).
Set(metrics.BoolToFloat64(voted))
}

func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node) error {
clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := govbeta.NewQueryClient(clientCtx)

Expand All @@ -131,13 +149,14 @@ func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node
ProposalStatus: govbeta.StatusVotingPeriod,
})
if err != nil {
return fmt.Errorf("failed to get proposals: %w", err)
return votes, fmt.Errorf("failed to fetch proposals in voting period: %w", err)
}

chainID := node.ChainID()

// For each proposal, fetch validators vote
for _, proposal := range proposalsResp.GetProposals() {
votes[proposal.ProposalId] = make(map[TrackedValidator]bool)
w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.ProposalId)).Set(float64(proposal.VotingEndTime.Unix()))

for _, validator := range w.validators {
Expand All @@ -152,17 +171,24 @@ func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node
})

if isInvalidArgumentError(err) {
w.handleVoteV1Beta1(chainID, validator, proposal.ProposalId, nil)
votes[proposal.ProposalId][validator] = false
} else if err != nil {
return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.ProposalId, err)
return votes, fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.ProposalId, err)
} else {
vote := voteResp.GetVote()
w.handleVoteV1Beta1(chainID, validator, proposal.ProposalId, vote.Options)
voted := false
for _, option := range vote.Options {
if option.Option != govbeta.OptionEmpty {
voted = true
break
}
}
votes[proposal.ProposalId][validator] = voted
}
}
}

return nil
return votes, nil
}

func (w *VotesWatcher) handleVoteV1Beta1(chainID string, validator TrackedValidator, proposalId uint64, votes []govbeta.WeightedVoteOption) {
Expand Down
Loading