Skip to content

Commit

Permalink
chore: refacto vote watcher and x-gov version (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
MattKetmo authored Jan 18, 2024
1 parent 24ea911 commit a933a4b
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 258 deletions.
2 changes: 1 addition & 1 deletion pkg/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ var Flags = []cli.Flag{
&cli.StringFlag{
Name: "x-gov",
Usage: "version of the gov module to use (v1|v1beta1)",
Value: "v1beta1",
Value: "v1",
},
}
25 changes: 11 additions & 14 deletions pkg/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,17 @@ func RunFunc(cCtx *cli.Context) error {
return validatorsWatcher.Start(ctx)
})
}
if xGov != "v1beta1" && xGov != "v1" {
log.Warn().Msgf("unknown gov module version: %s (fallback to v1)", xGov)
xGov = "v1"
}
if !noGov {
switch xGov {
case "v1beta1":
votesWatcher := watcher.NewVotesV1Beta1Watcher(trackedValidators, metrics, pool)
errg.Go(func() error {
return votesWatcher.Start(ctx)
})
case "v1":
votesWatcher := watcher.NewVotesV1Watcher(trackedValidators, metrics, pool)
errg.Go(func() error {
return votesWatcher.Start(ctx)
})
default:
log.Warn().Msgf("unknown gov module version: %s", xGov)
}
votesWatcher := watcher.NewVotesWatcher(trackedValidators, metrics, pool, watcher.VotesWatcherOptions{
GovModuleVersion: xGov,
})
errg.Go(func() error {
return votesWatcher.Start(ctx)
})
}
var wh *webhook.Webhook
if webhookURL != "" {
Expand All @@ -128,6 +124,7 @@ func RunFunc(cCtx *cli.Context) error {
}
upgradeWatcher := watcher.NewUpgradeWatcher(metrics, pool, wh, watcher.UpgradeWatcherOptions{
CheckPendingProposals: !noGov,
GovModuleVersion: xGov,
})
errg.Go(func() error {
return upgradeWatcher.Start(ctx)
Expand Down
89 changes: 68 additions & 21 deletions pkg/watcher/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
ctypes "github.com/cometbft/cometbft/rpc/core/types"
comettypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/cosmos-sdk/client"
gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1"
govbeta "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1"
"github.com/cosmos/cosmos-sdk/x/upgrade/types"
upgrade "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/gogo/protobuf/codec"
Expand All @@ -31,6 +33,7 @@ type UpgradeWatcher struct {

type UpgradeWatcherOptions struct {
CheckPendingProposals bool
GovModuleVersion string
}

func NewUpgradeWatcher(metrics *metrics.Metrics, pool *rpc.Pool, webhook *webhook.Webhook, options UpgradeWatcherOptions) *UpgradeWatcher {
Expand Down Expand Up @@ -135,7 +138,12 @@ func (w *UpgradeWatcher) fetchUpgrade(ctx context.Context, node *rpc.Node) error
plan := resp.Plan

if plan == nil && w.options.CheckPendingProposals {
plan, err = w.checkUpgradeProposals(ctx, node)
switch w.options.GovModuleVersion {
case "v1beta1":
plan, err = w.checkUpgradeProposalsV1Beta1(ctx, node)
default: // v1
plan, err = w.checkUpgradeProposalsV1(ctx, node)
}
if err != nil {
log.Error().Err(err).Msg("failed to check upgrade proposals")
}
Expand All @@ -146,7 +154,7 @@ func (w *UpgradeWatcher) fetchUpgrade(ctx context.Context, node *rpc.Node) error
return nil
}

func (w *UpgradeWatcher) checkUpgradeProposals(ctx context.Context, node *rpc.Node) (*upgrade.Plan, error) {
func (w *UpgradeWatcher) checkUpgradeProposalsV1(ctx context.Context, node *rpc.Node) (*upgrade.Plan, error) {
clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := gov.NewQueryClient(clientCtx)

Expand All @@ -159,29 +167,68 @@ func (w *UpgradeWatcher) checkUpgradeProposals(ctx context.Context, node *rpc.No
}

for _, proposal := range proposalsResp.GetProposals() {
if proposal.Content == nil {
continue
for _, message := range proposal.Messages {
plan, err := extractUpgradePlan(message)
if err != nil {
return nil, fmt.Errorf("failed to extract upgrade plan: %w", err)
}
if plan != nil {
return plan, nil
}
}
}

cdc := codec.New(1)
return nil, nil
}

switch proposal.Content.TypeUrl {
case "/cosmos.upgrade.v1beta1.SoftwareUpgradeProposal":
var upgrade types.SoftwareUpgradeProposal
err := cdc.Unmarshal(proposal.Content.Value, &upgrade)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal software upgrade proposal: %w", err)
}
return &upgrade.Plan, nil
func (w *UpgradeWatcher) checkUpgradeProposalsV1Beta1(ctx context.Context, node *rpc.Node) (*upgrade.Plan, error) {
clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := govbeta.NewQueryClient(clientCtx)

case "/cosmos.upgrade.v1beta1.MsgSoftwareUpgrade":
var upgrade types.MsgSoftwareUpgrade
err := cdc.Unmarshal(proposal.Content.Value, &upgrade)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal software upgrade proposal: %w", err)
}
return &upgrade.Plan, nil
// Fetch all proposals in voting period
proposalsResp, err := queryClient.Proposals(ctx, &govbeta.QueryProposalsRequest{
ProposalStatus: govbeta.StatusVotingPeriod,
})
if err != nil {
return nil, fmt.Errorf("failed to get proposals: %w", err)
}

for _, proposal := range proposalsResp.GetProposals() {
plan, err := extractUpgradePlan(proposal.Content)
if err != nil {
return nil, fmt.Errorf("failed to extract upgrade plan: %w", err)
}
if plan != nil {
return plan, nil
}
}

return nil, nil
}

func extractUpgradePlan(content *codectypes.Any) (*upgrade.Plan, error) {
if content == nil {
return nil, nil
}

cdc := codec.New(1)

switch content.TypeUrl {
case "/cosmos.upgrade.v1beta1.SoftwareUpgradeProposal":
var upgrade types.SoftwareUpgradeProposal
err := cdc.Unmarshal(content.Value, &upgrade)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal software upgrade proposal: %w", err)
}
return &upgrade.Plan, nil

case "/cosmos.upgrade.v1beta1.MsgSoftwareUpgrade":
var upgrade types.MsgSoftwareUpgrade
err := cdc.Unmarshal(content.Value, &upgrade)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal software upgrade proposal: %w", err)
}
return &upgrade.Plan, nil
}

return nil, nil
Expand Down
187 changes: 187 additions & 0 deletions pkg/watcher/votes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package watcher

import (
"context"
"fmt"
"time"

"github.com/cosmos/cosmos-sdk/client"
gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1"
govbeta "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1"
"github.com/kilnfi/cosmos-validator-watcher/pkg/metrics"
"github.com/kilnfi/cosmos-validator-watcher/pkg/rpc"
"github.com/rs/zerolog/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type VotesWatcher struct {
metrics *metrics.Metrics
validators []TrackedValidator
pool *rpc.Pool
options VotesWatcherOptions
}

type VotesWatcherOptions struct {
GovModuleVersion string
}

func NewVotesWatcher(validators []TrackedValidator, metrics *metrics.Metrics, pool *rpc.Pool, options VotesWatcherOptions) *VotesWatcher {
return &VotesWatcher{
metrics: metrics,
validators: validators,
pool: pool,
options: options,
}
}

func (w *VotesWatcher) Start(ctx context.Context) error {
ticker := time.NewTicker(1 * time.Minute)

for {
node := w.pool.GetSyncedNode()
if node == nil {
log.Warn().Msg("no node available to fetch proposals")
} else if err := w.fetchProposalsV1(ctx, node); err != nil {
log.Error().Err(err).Msg("failed to fetch pending proposals")
}

select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
}
}

func (w *VotesWatcher) fetchProposals(ctx context.Context, node *rpc.Node) error {
switch w.options.GovModuleVersion {
case "v1beta1":
return w.fetchProposalsV1Beta1(ctx, node)
default: // v1
return w.fetchProposalsV1(ctx, node)
}
}
func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) error {
clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := gov.NewQueryClient(clientCtx)

// Fetch all proposals in voting period
proposalsResp, err := queryClient.Proposals(ctx, &gov.QueryProposalsRequest{
ProposalStatus: gov.StatusVotingPeriod,
})
if err != nil {
return fmt.Errorf("failed to get proposals: %w", err)
}

chainID := node.ChainID()

// For each proposal, fetch validators vote
for _, proposal := range proposalsResp.GetProposals() {
w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.Id)).Set(float64(proposal.VotingEndTime.Unix()))

for _, validator := range w.validators {
voter := validator.AccountAddress()
if voter == "" {
log.Warn().Str("validator", validator.Name).Msg("no account address for validator")
continue
}
voteResp, err := queryClient.Vote(ctx, &gov.QueryVoteRequest{
ProposalId: proposal.Id,
Voter: voter,
})

w.metrics.Vote.Reset()
if isInvalidArgumentError(err) {
w.handleVoteV1(chainID, validator, proposal.Id, nil)
} else if err != nil {
return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.Id, err)
} else {
vote := voteResp.GetVote()
w.handleVoteV1(chainID, validator, proposal.Id, vote.Options)
}
}
}

return nil
}

func (w *VotesWatcher) handleVoteV1(chainID string, validator TrackedValidator, proposalId uint64, votes []*gov.WeightedVoteOption) {
voted := false
for _, option := range votes {
if option.Option != gov.OptionEmpty {
voted = true
break
}
}

w.metrics.Vote.
WithLabelValues(chainID, validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)).
Set(metrics.BoolToFloat64(voted))
}

func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node) error {
clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := govbeta.NewQueryClient(clientCtx)

// Fetch all proposals in voting period
proposalsResp, err := queryClient.Proposals(ctx, &govbeta.QueryProposalsRequest{
ProposalStatus: govbeta.StatusVotingPeriod,
})
if err != nil {
return fmt.Errorf("failed to get proposals: %w", err)
}

chainID := node.ChainID()

// For each proposal, fetch validators vote
for _, proposal := range proposalsResp.GetProposals() {
w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.ProposalId)).Set(float64(proposal.VotingEndTime.Unix()))

for _, validator := range w.validators {
voter := validator.AccountAddress()
if voter == "" {
log.Warn().Str("validator", validator.Name).Msg("no account address for validator")
continue
}
voteResp, err := queryClient.Vote(ctx, &govbeta.QueryVoteRequest{
ProposalId: proposal.ProposalId,
Voter: voter,
})

w.metrics.Vote.Reset()
if isInvalidArgumentError(err) {
w.handleVoteV1Beta1(chainID, validator, proposal.ProposalId, nil)
} else if err != nil {
return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.ProposalId, err)
} else {
vote := voteResp.GetVote()
w.handleVoteV1Beta1(chainID, validator, proposal.ProposalId, vote.Options)
}
}
}

return nil
}

func (w *VotesWatcher) handleVoteV1Beta1(chainID string, validator TrackedValidator, proposalId uint64, votes []govbeta.WeightedVoteOption) {
voted := false
for _, option := range votes {
if option.Option != govbeta.OptionEmpty {
voted = true
break
}
}

w.metrics.Vote.
WithLabelValues(chainID, validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)).
Set(metrics.BoolToFloat64(voted))
}

func isInvalidArgumentError(err error) bool {
st, ok := status.FromError(err)
if !ok {
return false
}
return st.Code() == codes.InvalidArgument
}
11 changes: 7 additions & 4 deletions pkg/watcher/votes_v1beta1_test.go → pkg/watcher/votes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ func TestVotesWatcher(t *testing.T) {
}
)

votesWatcher := NewVotesV1Beta1Watcher(
votesWatcher := NewVotesWatcher(
validators,
metrics.New("cosmos_validator_watcher"),
nil,
VotesWatcherOptions{
GovModuleVersion: "v1beta1",
},
)

t.Run("Handle Votes", func(t *testing.T) {
votesWatcher.handleVote(chainID, validators[0], 40, nil)
votesWatcher.handleVote(chainID, validators[0], 41, []gov.WeightedVoteOption{{Option: gov.OptionEmpty}})
votesWatcher.handleVote(chainID, validators[0], 42, []gov.WeightedVoteOption{{Option: gov.OptionYes}})
votesWatcher.handleVoteV1Beta1(chainID, validators[0], 40, nil)
votesWatcher.handleVoteV1Beta1(chainID, validators[0], 41, []gov.WeightedVoteOption{{Option: gov.OptionEmpty}})
votesWatcher.handleVoteV1Beta1(chainID, validators[0], 42, []gov.WeightedVoteOption{{Option: gov.OptionYes}})

assert.Equal(t, float64(0), testutil.ToFloat64(votesWatcher.metrics.Vote.WithLabelValues(chainID, kilnAddress, kilnName, "40")))
assert.Equal(t, float64(0), testutil.ToFloat64(votesWatcher.metrics.Vote.WithLabelValues(chainID, kilnAddress, kilnName, "41")))
Expand Down
Loading

0 comments on commit a933a4b

Please sign in to comment.