Skip to content

Commit

Permalink
feat: add an option to trigger a webhook at any block
Browse files Browse the repository at this point in the history
  • Loading branch information
MattKetmo committed Aug 21, 2024
1 parent 92d728b commit 8427923
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 30 deletions.
4 changes: 4 additions & 0 deletions pkg/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ var Flags = []cli.Flag{
Name: "webhook-url",
Usage: "endpoint where to send upgrade webhooks (experimental)",
},
&cli.StringSliceFlag{
Name: "webhook-custom-block",
Usage: "trigger a custom webhook at a given block number (experimental)",
},
&cli.StringFlag{
Name: "x-gov",
Usage: "version of the gov module to use (v1|v1beta1)",
Expand Down
68 changes: 42 additions & 26 deletions pkg/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"os"
"os/signal"
"strconv"
"syscall"

"github.com/cometbft/cometbft/rpc/client/http"
Expand All @@ -31,23 +32,24 @@ func RunFunc(cCtx *cli.Context) error {
ctx = cCtx.Context

// Config flags
chainID = cCtx.String("chain-id")
httpAddr = cCtx.String("http-addr")
logLevel = cCtx.String("log-level")
namespace = cCtx.String("namespace")
noColor = cCtx.Bool("no-color")
nodes = cCtx.StringSlice("node")
noGov = cCtx.Bool("no-gov")
noStaking = cCtx.Bool("no-staking")
noUpgrade = cCtx.Bool("no-upgrade")
noCommission = cCtx.Bool("no-commission")
denom = cCtx.String("denom")
denomExpon = cCtx.Uint("denom-exponent")
startTimeout = cCtx.Duration("start-timeout")
stopTimeout = cCtx.Duration("stop-timeout")
validators = cCtx.StringSlice("validator")
webhookURL = cCtx.String("webhook-url")
xGov = cCtx.String("x-gov")
chainID = cCtx.String("chain-id")
httpAddr = cCtx.String("http-addr")
logLevel = cCtx.String("log-level")
namespace = cCtx.String("namespace")
noColor = cCtx.Bool("no-color")
nodes = cCtx.StringSlice("node")
noGov = cCtx.Bool("no-gov")
noStaking = cCtx.Bool("no-staking")
noUpgrade = cCtx.Bool("no-upgrade")
noCommission = cCtx.Bool("no-commission")
denom = cCtx.String("denom")
denomExpon = cCtx.Uint("denom-exponent")
startTimeout = cCtx.Duration("start-timeout")
stopTimeout = cCtx.Duration("stop-timeout")
validators = cCtx.StringSlice("validator")
webhookURL = cCtx.String("webhook-url")
webhookCustomBlocks = cCtx.StringSlice("webhook-custom-block")
xGov = cCtx.String("x-gov")
)

//
Expand Down Expand Up @@ -84,12 +86,34 @@ func RunFunc(cCtx *cli.Context) error {
return err
}

var wh *webhook.Webhook
if webhookURL != "" {
whURL, err := url.Parse(webhookURL)
if err != nil {
return fmt.Errorf("failed to parse webhook endpoint: %w", err)
}
wh = webhook.New(*whURL)
}

// Custom block webhooks
blockWebhooks := []watcher.BlockWebhook{}
for _, block := range webhookCustomBlocks {
blockHeight, err := strconv.ParseInt(block, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse block height for custom webhook (%s): %w", block, err)
}
blockWebhooks = append(blockWebhooks, watcher.BlockWebhook{
Height: blockHeight,
Metadata: map[string]string{},
})
}

//
// Node Watchers
//
metrics := metrics.New(namespace)
metrics.Register()
blockWatcher := watcher.NewBlockWatcher(trackedValidators, metrics, os.Stdout)
blockWatcher := watcher.NewBlockWatcher(trackedValidators, metrics, os.Stdout, wh, blockWebhooks)
errg.Go(func() error {
return blockWatcher.Start(ctx)
})
Expand Down Expand Up @@ -128,14 +152,6 @@ func RunFunc(cCtx *cli.Context) error {
return votesWatcher.Start(ctx)
})
}
var wh *webhook.Webhook
if webhookURL != "" {
whURL, err := url.Parse(webhookURL)
if err != nil {
return fmt.Errorf("failed to parse webhook endpoint: %w", err)
}
wh = webhook.New(*whURL)
}

var upgradeWatcher *watcher.UpgradeWatcher
if !noUpgrade {
Expand Down
54 changes: 51 additions & 3 deletions pkg/watcher/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ import (
"github.com/fatih/color"
"github.com/kilnfi/cosmos-validator-watcher/pkg/metrics"
"github.com/kilnfi/cosmos-validator-watcher/pkg/rpc"
"github.com/kilnfi/cosmos-validator-watcher/pkg/webhook"
"github.com/rs/zerolog/log"
"github.com/shopspring/decimal"
)

type BlockWebhook struct {
Height int64 `json:"height"`
Metadata map[string]string `json:"metadata"`
}

type BlockWatcher struct {
trackedValidators []TrackedValidator
metrics *metrics.Metrics
Expand All @@ -25,14 +31,18 @@ type BlockWatcher struct {
validatorSet atomic.Value // []*types.Validator
latestBlockHeight int64
latestBlockProposer string
webhook *webhook.Webhook
customWebhooks []BlockWebhook
}

func NewBlockWatcher(validators []TrackedValidator, metrics *metrics.Metrics, writer io.Writer) *BlockWatcher {
func NewBlockWatcher(validators []TrackedValidator, metrics *metrics.Metrics, writer io.Writer, webhook *webhook.Webhook, customWebhooks []BlockWebhook) *BlockWatcher {
return &BlockWatcher{
trackedValidators: validators,
metrics: metrics,
writer: writer,
blockChan: make(chan *BlockInfo),
webhook: webhook,
customWebhooks: customWebhooks,
}
}

Expand All @@ -42,7 +52,7 @@ func (w *BlockWatcher) Start(ctx context.Context) error {
case <-ctx.Done():
return nil
case block := <-w.blockChan:
w.handleBlockInfo(block)
w.handleBlockInfo(ctx, block)
}
}
}
Expand Down Expand Up @@ -159,7 +169,7 @@ func (w *BlockWatcher) syncValidatorSet(ctx context.Context, n *rpc.Node) error
return nil
}

func (w *BlockWatcher) handleBlockInfo(block *BlockInfo) {
func (w *BlockWatcher) handleBlockInfo(ctx context.Context, block *BlockInfo) {
chainId := block.ChainID

if w.latestBlockHeight >= block.Height {
Expand Down Expand Up @@ -220,6 +230,9 @@ func (w *BlockWatcher) handleBlockInfo(block *BlockInfo) {
strings.Join(validatorStatus, " "),
)

// Handle webhooks
w.handleWebhooks(ctx, block)

w.latestBlockHeight = block.Height
w.latestBlockProposer = block.ProposerAddress
}
Expand Down Expand Up @@ -261,3 +274,38 @@ func (w *BlockWatcher) isValidatorActive(address string) bool {
}
return false
}

func (w *BlockWatcher) handleWebhooks(ctx context.Context, block *BlockInfo) {
if len(w.customWebhooks) == 0 {
return
}

newWebhooks := []BlockWebhook{}

for _, webhook := range w.customWebhooks {
// If webhook block height is passed
if webhook.Height <= block.Height {
w.triggerWebhook(ctx, block.ChainID, webhook)
} else {
newWebhooks = append(newWebhooks, webhook)
}
}

w.customWebhooks = newWebhooks
}

func (w *BlockWatcher) triggerWebhook(ctx context.Context, chainID string, wh BlockWebhook) {
msg := make(map[string]string)
msg["type"] = "custom"
msg["block"] = fmt.Sprintf("%d", wh.Height)
msg["chain_id"] = chainID
for k, v := range wh.Metadata {
msg[k] = v
}

go func() {
if err := w.webhook.Send(context.Background(), msg); err != nil {
log.Error().Err(err).Msg("failed to send upgrade webhook")
}
}()
}
4 changes: 4 additions & 0 deletions pkg/watcher/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package watcher

import (
"bytes"
"net/url"
"strings"
"testing"

"github.com/kilnfi/cosmos-validator-watcher/pkg/metrics"
"github.com/kilnfi/cosmos-validator-watcher/pkg/webhook"
"github.com/prometheus/client_golang/prometheus/testutil"
"gotest.tools/assert"
)
Expand All @@ -26,6 +28,8 @@ func TestBlockWatcher(t *testing.T) {
},
metrics.New("cosmos_validator_watcher"),
&bytes.Buffer{},
webhook.New(url.URL{}),
[]BlockWebhook{},
)

t.Run("Handle BlockInfo", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (w *Webhook) Send(ctx context.Context, message interface{}) error {
}, retryOpts...)
}

func (w *Webhook) postRequest(ctx context.Context, req *http.Request) error {
func (w *Webhook) postRequest(_ context.Context, req *http.Request) error {
resp, err := w.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
Expand Down

0 comments on commit 8427923

Please sign in to comment.