From 2956a5be60ed103a132643506bf2610aa01d4e03 Mon Sep 17 00:00:00 2001 From: Matt Ketmo Date: Tue, 19 Dec 2023 16:39:14 +0100 Subject: [PATCH] feat: send webhook on planned upgrade (#43) --- README.md | 3 +- pkg/app/flags.go | 4 ++ pkg/app/run.go | 29 +++++++++++---- pkg/watcher/upgrade.go | 74 ++++++++++++++++++++++++++++++++++++- pkg/watcher/upgrade_test.go | 1 + pkg/webhook/webhook.go | 70 +++++++++++++++++++++++++++++++++++ 6 files changed, 172 insertions(+), 9 deletions(-) create mode 100644 pkg/webhook/webhook.go diff --git a/README.md b/README.md index 525d7d1..e73c1d6 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Features: - Track the **staked amount** as well as the min seat price - Track **pending proposals** and check if your validator has voted (including proposal end time) - Expose **upgrade plan** to know when the next upgrade will happen (including pending proposals) -- Trigger webhook when an upgrade happens (soon) +- Trigger webhook when an upgrade happens ![Cosmos Validator Watcher Screenshot](assets/cosmos-validator-watcher-screenshot.jpg) @@ -75,6 +75,7 @@ GLOBAL OPTIONS: --denom value denom used in metrics label (eg. atom or uatom) --denom-exponent value denom exponent (eg. 6 for atom, 1 for uatom) (default: 0) --validator value [ --validator value ] validator address(es) to track (use :my-label to add a custom label in metrics & ouput) + --webhook-url value endpoint where to send upgrade webhooks --x-gov value version of the gov module to use (v1|v1beta1) (default: "v1beta1") --help, -h show help --version, -v print the version diff --git a/pkg/app/flags.go b/pkg/app/flags.go index 4a7369a..1c4d3f9 100644 --- a/pkg/app/flags.go +++ b/pkg/app/flags.go @@ -51,6 +51,10 @@ var Flags = []cli.Flag{ Name: "validator", Usage: "validator address(es) to track (use :my-label to add a custom label in metrics & ouput)", }, + &cli.StringFlag{ + Name: "webhook-url", + Usage: "endpoint where to send upgrade webhooks (experimental)", + }, &cli.StringFlag{ Name: "x-gov", Usage: "version of the gov module to use (v1|v1beta1)", diff --git a/pkg/app/run.go b/pkg/app/run.go index 138a38c..31147df 100644 --- a/pkg/app/run.go +++ b/pkg/app/run.go @@ -3,6 +3,7 @@ package app import ( "context" "fmt" + "net/url" "os" "os/signal" "syscall" @@ -17,6 +18,7 @@ import ( "github.com/kilnfi/cosmos-validator-watcher/pkg/metrics" "github.com/kilnfi/cosmos-validator-watcher/pkg/rpc" "github.com/kilnfi/cosmos-validator-watcher/pkg/watcher" + "github.com/kilnfi/cosmos-validator-watcher/pkg/webhook" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/samber/lo" @@ -40,6 +42,7 @@ func RunFunc(cCtx *cli.Context) error { denom = cCtx.String("denom") denomExpon = cCtx.Uint("denom-exponent") validators = cCtx.StringSlice("validator") + webhookURL = cCtx.String("webhook-url") xGov = cCtx.String("x-gov") ) @@ -86,12 +89,6 @@ func RunFunc(cCtx *cli.Context) error { errg.Go(func() error { return statusWatcher.Start(ctx) }) - // Register watchers on nodes events - for _, node := range pool.Nodes { - node.OnStart(blockWatcher.OnNodeStart) - node.OnStatus(statusWatcher.OnNodeStatus) - node.OnEvent(rpc.EventNewBlock, blockWatcher.OnNewBlock) - } // // Pool watchers @@ -121,13 +118,31 @@ func RunFunc(cCtx *cli.Context) error { log.Warn().Msgf("unknown gov module version: %s", xGov) } } - upgradeWatcher := watcher.NewUpgradeWatcher(metrics, pool, watcher.UpgradeWatcherOptions{ + var wh *webhook.Webhook + if webhookURL != "" { + whURL, err := url.Parse(webhookURL) + if err != nil { + return fmt.Errorf("failed to parse webhook endpoint: %w", err) + } + wh = webhook.New(*whURL) + } + upgradeWatcher := watcher.NewUpgradeWatcher(metrics, pool, wh, watcher.UpgradeWatcherOptions{ CheckPendingProposals: !noGov, }) errg.Go(func() error { return upgradeWatcher.Start(ctx) }) + // + // Register watchers on nodes events + // + for _, node := range pool.Nodes { + node.OnStart(blockWatcher.OnNodeStart) + node.OnStatus(statusWatcher.OnNodeStatus) + node.OnEvent(rpc.EventNewBlock, blockWatcher.OnNewBlock) + node.OnEvent(rpc.EventNewBlock, upgradeWatcher.OnNewBlock) + } + // // Start Pool // diff --git a/pkg/watcher/upgrade.go b/pkg/watcher/upgrade.go index bf65470..6e9513e 100644 --- a/pkg/watcher/upgrade.go +++ b/pkg/watcher/upgrade.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + comettypes "github.com/cometbft/cometbft/types" "github.com/cosmos/cosmos-sdk/client" gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1" "github.com/cosmos/cosmos-sdk/x/upgrade/types" @@ -12,23 +14,30 @@ import ( "github.com/gogo/protobuf/codec" "github.com/kilnfi/cosmos-validator-watcher/pkg/metrics" "github.com/kilnfi/cosmos-validator-watcher/pkg/rpc" + "github.com/kilnfi/cosmos-validator-watcher/pkg/webhook" "github.com/rs/zerolog/log" ) type UpgradeWatcher struct { metrics *metrics.Metrics pool *rpc.Pool + webhook *webhook.Webhook options UpgradeWatcherOptions + + nextUpgradePlan *upgrade.Plan // known upgrade plan + latestBlockHeight int64 // latest block received + latestWebhookSent int64 // latest block for which webhook has been sent } type UpgradeWatcherOptions struct { CheckPendingProposals bool } -func NewUpgradeWatcher(metrics *metrics.Metrics, pool *rpc.Pool, options UpgradeWatcherOptions) *UpgradeWatcher { +func NewUpgradeWatcher(metrics *metrics.Metrics, pool *rpc.Pool, webhook *webhook.Webhook, options UpgradeWatcherOptions) *UpgradeWatcher { return &UpgradeWatcher{ metrics: metrics, pool: pool, + webhook: webhook, options: options, } } @@ -52,6 +61,68 @@ func (w *UpgradeWatcher) Start(ctx context.Context) error { } } +func (w *UpgradeWatcher) OnNewBlock(ctx context.Context, node *rpc.Node, evt *ctypes.ResultEvent) error { + // Ignore is webhook is not configured + if w.webhook == nil { + return nil + } + + // Ignore if no upgrade plan + if w.nextUpgradePlan == nil { + return nil + } + + // Ignore blocks if node is catching up + if !node.IsSynced() { + return nil + } + + blockEvent := evt.Data.(comettypes.EventDataNewBlock) + block := blockEvent.Block + + // Skip already processed blocks + if w.latestBlockHeight >= block.Height { + return nil + } + + w.latestBlockHeight = block.Height + + // Ignore if upgrade plan is for a future block + if block.Height < w.nextUpgradePlan.Height-1 { + return nil + } + + // Ignore if webhook has already been sent + if w.latestWebhookSent >= w.nextUpgradePlan.Height { + return nil + } + + // Upgrade plan is for this block + go w.triggerWebhook(ctx, node.ChainID(), *w.nextUpgradePlan) + w.nextUpgradePlan = nil + w.latestWebhookSent = w.nextUpgradePlan.Height + + return nil +} + +func (w *UpgradeWatcher) triggerWebhook(ctx context.Context, chainID string, plan upgrade.Plan) { + msg := struct { + Type string `json:"type"` + Block int64 `json:"block"` + ChainID string `json:"chain_id"` + Version string `json:"version"` + }{ + Type: "upgrade", + Block: plan.Height, + ChainID: chainID, + Version: plan.Name, + } + + if err := w.webhook.Send(ctx, msg); err != nil { + log.Error().Err(err).Msg("failed to send upgrade webhook") + } +} + func (w *UpgradeWatcher) fetchUpgrade(ctx context.Context, node *rpc.Node) error { clientCtx := (client.Context{}).WithClient(node.Client) queryClient := upgrade.NewQueryClient(clientCtx) @@ -122,5 +193,6 @@ func (w *UpgradeWatcher) handleUpgradePlan(chainID string, plan *upgrade.Plan) { return } + w.nextUpgradePlan = plan w.metrics.UpgradePlan.WithLabelValues(chainID, plan.Name).Set(float64(plan.Height)) } diff --git a/pkg/watcher/upgrade_test.go b/pkg/watcher/upgrade_test.go index 59b4031..4e8f2f0 100644 --- a/pkg/watcher/upgrade_test.go +++ b/pkg/watcher/upgrade_test.go @@ -15,6 +15,7 @@ func TestUpgradeWatcher(t *testing.T) { watcher := NewUpgradeWatcher( metrics.New("cosmos_validator_watcher"), nil, + nil, UpgradeWatcherOptions{}, ) diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go new file mode 100644 index 0000000..bfc77e0 --- /dev/null +++ b/pkg/webhook/webhook.go @@ -0,0 +1,70 @@ +package webhook + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/avast/retry-go/v4" + "github.com/rs/zerolog/log" +) + +type Webhook struct { + endpoint url.URL + client *http.Client +} + +func New(endpoint url.URL) *Webhook { + return &Webhook{ + endpoint: endpoint, + client: &http.Client{}, + } +} + +func (w *Webhook) Send(ctx context.Context, message interface{}) error { + body, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + log.Info().Msgf("sending webhook: %s", body) + + req, err := http.NewRequestWithContext(ctx, "POST", w.endpoint.String(), bytes.NewBuffer(body)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + + retryOpts := []retry.Option{ + retry.Context(ctx), + retry.Delay(1 * time.Second), + retry.Attempts(3), + retry.OnRetry(func(_ uint, err error) { + log.Warn().Err(err).Msgf("retrying webhook on %s", w.endpoint.String()) + }), + } + + return retry.Do(func() error { + return w.postRequest(ctx, req) + }, retryOpts...) +} + +func (w *Webhook) postRequest(ctx context.Context, req *http.Request) error { + resp, err := w.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + // Check if response is not 4xx or 5xx + if resp.StatusCode >= 400 { + return fmt.Errorf("unexpected response status: %s", resp.Status) + } + + return nil +}