From 070af99e75d18434703e39e946823c5a176e707f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20Petruni=C4=87?= Date: Wed, 23 Oct 2024 14:28:18 +0200 Subject: [PATCH 1/2] feat: system and resource metrics (#37) --- go.mod | 2 +- observability/metrics.go | 14 ++- observability/metrics/system.go | 215 ++++++++++++++++++++++++++++++++ 3 files changed, 228 insertions(+), 3 deletions(-) create mode 100644 observability/metrics/system.go diff --git a/go.mod b/go.mod index a1836255..5fab6f7f 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( github.com/mitchellh/mapstructure v1.4.2 // indirect github.com/pierrec/xxHash v0.1.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect + github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect go.opentelemetry.io/otel/sdk v1.16.0 diff --git a/observability/metrics.go b/observability/metrics.go index 1b6e25c2..afe10b93 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/sygmaprotocol/sygma-core/observability/metrics" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric" @@ -58,6 +59,8 @@ func InitMetricProvider(ctx context.Context, agentURL string) (*sdkmetric.MeterP } type RelayerMetrics struct { + *metrics.SystemMetrics + meter metric.Meter Opts api.MeasurementOption @@ -73,11 +76,11 @@ type RelayerMetrics struct { } // NewRelayerMetrics initializes OpenTelemetry metrics -func NewRelayerMetrics(meter metric.Meter, attributes ...attribute.KeyValue) (*RelayerMetrics, error) { +func NewRelayerMetrics(ctx context.Context, meter metric.Meter, attributes ...attribute.KeyValue) (*RelayerMetrics, error) { opts := api.WithAttributes(attributes...) blockDeltaMap := make(map[uint8]*big.Int) - blockDeltaGauge, err := meter.Int64ObservableGauge( + blockDeltaGauge, _ := meter.Int64ObservableGauge( "relayer.BlockDelta", metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { for domainID, delta := range blockDeltaMap { @@ -90,7 +93,14 @@ func NewRelayerMetrics(meter metric.Meter, attributes ...attribute.KeyValue) (*R }), metric.WithDescription("Difference between chain head and current indexed block per domain"), ) + + systemMetrics, err := metrics.NewSystemMetrics(ctx, meter, opts) + if err != nil { + return nil, err + } + return &RelayerMetrics{ + SystemMetrics: systemMetrics, meter: meter, MessageEventTime: make(map[string]time.Time), Opts: opts, diff --git a/observability/metrics/system.go b/observability/metrics/system.go new file mode 100644 index 00000000..65208493 --- /dev/null +++ b/observability/metrics/system.go @@ -0,0 +1,215 @@ +package metrics + +import ( + "context" + "runtime" + "runtime/debug" + "time" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/disk" + "github.com/shirou/gopsutil/mem" + "github.com/shirou/gopsutil/net" + "go.opentelemetry.io/otel/metric" +) + +const ( + GC_STATS_UPDATE_PERIOD = time.Second * 10 +) + +type SystemMetrics struct { + opts metric.MeasurementOption + + goRoutinesGauge metric.Int64ObservableGauge + totalMemoryGauge metric.Int64ObservableGauge + usedMemoryGauge metric.Int64ObservableGauge + cpuUsageGauge metric.Float64ObservableGauge + gcDurationHistogram metric.Float64Histogram + diskUsageGauge metric.Int64ObservableGauge + totalDiskGauge metric.Int64ObservableGauge + networkIOReceivedGauge metric.Int64ObservableGauge + networkIOSentGauge metric.Int64ObservableGauge +} + +// NewSystemMetrics initializes system performance and resource utilization metrics +func NewSystemMetrics(ctx context.Context, meter metric.Meter, opts metric.MeasurementOption) (*SystemMetrics, error) { + goRoutinesGauge, err := meter.Int64ObservableGauge( + "relayer.GoRoutines", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + result.Observe(int64(runtime.NumGoroutine()), opts) + return nil + }), + metric.WithDescription("Number of Go routines running."), + ) + if err != nil { + return nil, err + } + + usedMemoryGauge, err := meter.Int64ObservableGauge( + "relayer.MemoryUsageBytes", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + v, err := mem.VirtualMemory() + if err != nil { + return err + } + + result.Observe(int64(v.Used), opts) + return nil + }), + metric.WithDescription("Memory usage in bytes."), + ) + if err != nil { + return nil, err + } + totalMemoryGauge, err := meter.Int64ObservableGauge( + "relayer.TotalMemoryBytes", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + v, err := mem.VirtualMemory() + if err != nil { + return err + } + + result.Observe(int64(v.Total), opts) + return nil + }), + metric.WithDescription("Total memory in bytes."), + ) + if err != nil { + return nil, err + } + + cpuUsageGauge, err := meter.Float64ObservableGauge( + "relayer.CpuUsagePercent", + metric.WithFloat64Callback(func(context context.Context, result metric.Float64Observer) error { + percents, err := cpu.Percent(0, false) + if err != nil { + return err + } + + result.Observe(percents[0], opts) + return nil + }), + metric.WithDescription("CPU usage percent."), + ) + if err != nil { + return nil, err + } + + diskUsageGauge, err := meter.Int64ObservableGauge( + "relayer.DiskUsageBytes", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + usage, err := disk.Usage("/") + if err != nil { + return err + } + + result.Observe(int64(usage.Used), opts) + return nil + }), + metric.WithDescription("Disk space used by the relayer in bytes."), + ) + if err != nil { + return nil, err + } + totalDiskGauge, err := meter.Int64ObservableGauge( + "relayer.TotalDiskBytes", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + usage, err := disk.Usage("/") + if err != nil { + return err + } + + result.Observe(int64(usage.Total), opts) + return nil + }), + metric.WithDescription("Total relayer disk space."), + ) + if err != nil { + return nil, err + } + + networkIOReceivedGauge, err := meter.Int64ObservableGauge( + "relayer.NetworkIOBytesReceived", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + stat, err := net.IOCounters(false) + if err != nil { + return err + } + + result.Observe(int64(stat[0].BytesRecv), opts) + return nil + }), + metric.WithDescription("Total network bytes received."), + ) + if err != nil { + return nil, err + } + networkIOSentGauge, err := meter.Int64ObservableGauge( + "relayer.NetworkIOBytesSent", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + stat, err := net.IOCounters(false) + if err != nil { + return err + } + + result.Observe(int64(stat[0].BytesSent), opts) + return nil + }), + metric.WithDescription("Total network bytes sent."), + ) + if err != nil { + return nil, err + } + + gcDurationHistogram, err := meter.Float64Histogram( + "relayer.GcDurationSeconds", + metric.WithDescription("Duration of garbage collection cycles."), + ) + if err != nil { + return nil, err + } + + m := &SystemMetrics{ + opts: opts, + goRoutinesGauge: goRoutinesGauge, + totalMemoryGauge: totalMemoryGauge, + usedMemoryGauge: usedMemoryGauge, + gcDurationHistogram: gcDurationHistogram, + cpuUsageGauge: cpuUsageGauge, + totalDiskGauge: totalDiskGauge, + diskUsageGauge: diskUsageGauge, + networkIOReceivedGauge: networkIOReceivedGauge, + networkIOSentGauge: networkIOSentGauge, + } + + go m.updateGCStats(ctx) + return m, err +} + +func (m *SystemMetrics) updateGCStats(ctx context.Context) { + ticker := time.NewTicker(GC_STATS_UPDATE_PERIOD) + var previousPauseDuration float64 + for { + select { + case <-ticker.C: + { + var gcStats debug.GCStats + debug.ReadGCStats(&gcStats) + if len(gcStats.Pause) == 0 { + continue + } + + recentPauseDuration := gcStats.Pause[0].Seconds() + if recentPauseDuration == previousPauseDuration { + continue + } + + m.gcDurationHistogram.Record(context.Background(), recentPauseDuration, m.opts) + previousPauseDuration = recentPauseDuration + } + case <-ctx.Done(): + return + + } + } +} From c4b524fe99e12a7a5b3e4540c9de1fae316f354c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20Petruni=C4=87?= Date: Wed, 23 Oct 2024 15:39:36 +0200 Subject: [PATCH 2/2] feat: chain and gas usage metrics (#38) --- Makefile | 2 +- chains/evm/transactor/monitored/monitored.go | 39 ++++- .../transactor/monitored/monitored_test.go | 19 +++ mock/{signAndSend.go => monitored.go} | 39 ++++- observability/metrics.go | 140 ++++++++++-------- observability/metrics/chain.go | 131 ++++++++++++++++ observability/metrics/system.go | 1 + 7 files changed, 304 insertions(+), 67 deletions(-) rename mock/{signAndSend.go => monitored.go} (50%) create mode 100644 observability/metrics/chain.go diff --git a/Makefile b/Makefile index 79eea798..c36e2f01 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ genmocks: mockgen -destination=./mock/gas.go -source=./chains/evm/transactor/gas/gas-pricer.go -package mock mockgen -destination=./mock/relayer.go -source=./relayer/relayer.go -package mock mockgen -source=chains/evm/transactor/transact.go -destination=./mock/transact.go -package mock - mockgen -source=chains/evm/transactor/signAndSend/signAndSend.go -destination=./mock/signAndSend.go -package mock + mockgen -source=chains/evm/transactor/monitored/monitored.go -destination=./mock/monitored.go -package mock mockgen -source=./store/store.go -destination=./mock/store.go -package mock mockgen -source=./relayer/message/handler.go -destination=./mock/message.go -package mock mockgen -source=./chains/evm/listener/listener.go -destination=./mock/evmListener.go -package mock diff --git a/chains/evm/transactor/monitored/monitored.go b/chains/evm/transactor/monitored/monitored.go index 785cf0e8..38776824 100644 --- a/chains/evm/transactor/monitored/monitored.go +++ b/chains/evm/transactor/monitored/monitored.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/sygmaprotocol/sygma-core/chains/evm/client" @@ -19,6 +20,10 @@ type GasPricer interface { GasPrice(priority *uint8) ([]*big.Int, error) } +type GasTracker interface { + TrackGasUsage(domainID uint8, gasUsed uint64, gasPrice *big.Int) +} + type RawTx struct { nonce uint64 to *common.Address @@ -30,9 +35,26 @@ type RawTx struct { creationTime time.Time } +// GasPrice returns transaction gas price in gwei +// +// It returns base fee for both London and legacy transactions. +func (tx *RawTx) GasPrice() *big.Int { + var gasPrice *big.Int + if len(tx.gasPrice) == 1 { + gasPrice = tx.gasPrice[0] + } else { + gasPrice = tx.gasPrice[1] + } + return new(big.Int).Div(gasPrice, big.NewInt(1e9)) +} + type MonitoredTransactor struct { + domainID uint8 + log zerolog.Logger + txFabric transaction.TxFabric gasPriceClient GasPricer + gasTracker GasTracker client client.Client maxGasPrice *big.Int @@ -49,15 +71,20 @@ type MonitoredTransactor struct { // Gas price is increased by increasePercentage param which // is a percentage value with which old gas price should be increased (e.g 15) func NewMonitoredTransactor( + domainID uint8, txFabric transaction.TxFabric, gasPriceClient GasPricer, + gasTracker GasTracker, client client.Client, maxGasPrice *big.Int, increasePercentage *big.Int, ) *MonitoredTransactor { return &MonitoredTransactor{ + domainID: domainID, + log: log.With().Uint8("domainID", domainID).Logger(), client: client, gasPriceClient: gasPriceClient, + gasTracker: gasTracker, txFabric: txFabric, pendingTxns: make(map[common.Hash]RawTx), maxGasPrice: maxGasPrice, @@ -143,10 +170,12 @@ func (t *MonitoredTransactor) Monitor( for oldHash, tx := range pendingTxCopy { receipt, err := t.client.TransactionReceipt(context.Background(), oldHash) if err == nil { + t.gasTracker.TrackGasUsage(t.domainID, receipt.GasUsed, tx.GasPrice()) + if receipt.Status == types.ReceiptStatusSuccessful { - log.Info().Uint64("nonce", tx.nonce).Msgf("Executed transaction %s with nonce %d", oldHash, tx.nonce) + t.log.Info().Uint64("nonce", tx.nonce).Msgf("Executed transaction %s with nonce %d", oldHash, tx.nonce) } else { - log.Error().Uint64("nonce", tx.nonce).Msgf("Transaction %s failed on chain", oldHash) + t.log.Error().Uint64("nonce", tx.nonce).Msgf("Transaction %s failed on chain", oldHash) } delete(t.pendingTxns, oldHash) @@ -154,7 +183,7 @@ func (t *MonitoredTransactor) Monitor( } if time.Since(tx.creationTime) > txTimeout { - log.Error().Uint64("nonce", tx.nonce).Msgf("Transaction %s has timed out", oldHash) + t.log.Error().Uint64("nonce", tx.nonce).Msgf("Transaction %s has timed out", oldHash) delete(t.pendingTxns, oldHash) continue } @@ -164,7 +193,7 @@ func (t *MonitoredTransactor) Monitor( hash, err := t.resendTransaction(&tx) if err != nil { - log.Warn().Uint64("nonce", tx.nonce).Err(err).Msgf("Failed resending transaction %s", hash) + t.log.Warn().Uint64("nonce", tx.nonce).Err(err).Msgf("Failed resending transaction %s", hash) continue } @@ -188,7 +217,7 @@ func (t *MonitoredTransactor) resendTransaction(tx *RawTx) (common.Hash, error) return common.Hash{}, err } - log.Debug().Uint64("nonce", tx.nonce).Msgf("Resent transaction with hash %s", hash) + t.log.Debug().Uint64("nonce", tx.nonce).Msgf("Resent transaction with hash %s", hash) return hash, nil } diff --git a/chains/evm/transactor/monitored/monitored_test.go b/chains/evm/transactor/monitored/monitored_test.go index bcd10d28..e5e2a5fe 100644 --- a/chains/evm/transactor/monitored/monitored_test.go +++ b/chains/evm/transactor/monitored/monitored_test.go @@ -21,6 +21,7 @@ type TransactorTestSuite struct { suite.Suite gomockController *gomock.Controller mockClient *mock.MockClient + mockGasTracker *mock.MockGasTracker mockTransactor *mock.MockTransactor mockGasPricer *mock.MockGasPricer } @@ -34,6 +35,8 @@ func (s *TransactorTestSuite) SetupTest() { s.mockClient = mock.NewMockClient(s.gomockController) s.mockTransactor = mock.NewMockTransactor(s.gomockController) s.mockGasPricer = mock.NewMockGasPricer(s.gomockController) + s.mockGasTracker = mock.NewMockGasTracker(s.gomockController) + s.mockGasTracker.EXPECT().TrackGasUsage(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() } func (s *TransactorTestSuite) TestTransactor_SignAndSend_Success() { @@ -47,8 +50,10 @@ func (s *TransactorTestSuite) TestTransactor_SignAndSend_Success() { s.mockClient.EXPECT().UnlockNonce() t := monitored.NewMonitoredTransactor( + 1, transaction.NewTransaction, s.mockGasPricer, + s.mockGasTracker, s.mockClient, big.NewInt(1000), big.NewInt(15)) @@ -72,8 +77,10 @@ func (s *TransactorTestSuite) TestTransactor_SignAndSend_Fail() { s.mockClient.EXPECT().UnlockNonce() t := monitored.NewMonitoredTransactor( + 1, transaction.NewTransaction, s.mockGasPricer, + s.mockGasTracker, s.mockClient, big.NewInt(1000), big.NewInt(15)) @@ -99,8 +106,10 @@ func (s *TransactorTestSuite) TestTransactor_MonitoredTransaction_SuccessfulExec ctx, cancel := context.WithCancel(context.Background()) t := monitored.NewMonitoredTransactor( + 1, transaction.NewTransaction, s.mockGasPricer, + s.mockGasTracker, s.mockClient, big.NewInt(1000), big.NewInt(15)) @@ -134,8 +143,10 @@ func (s *TransactorTestSuite) TestTransactor_MonitoredTransaction_TxTimeout() { ctx, cancel := context.WithCancel(context.Background()) t := monitored.NewMonitoredTransactor( + 1, transaction.NewTransaction, s.mockGasPricer, + s.mockGasTracker, s.mockClient, big.NewInt(1000), big.NewInt(15)) @@ -169,8 +180,10 @@ func (s *TransactorTestSuite) TestTransactor_MonitoredTransaction_TransactionRes ctx, cancel := context.WithCancel(context.Background()) t := monitored.NewMonitoredTransactor( + 1, transaction.NewTransaction, s.mockGasPricer, + s.mockGasTracker, s.mockClient, big.NewInt(1000), big.NewInt(15)) @@ -208,8 +221,10 @@ func (s *TransactorTestSuite) TestTransactor_MonitoredTransaction_MaxGasPriceRea ctx, cancel := context.WithCancel(context.Background()) t := monitored.NewMonitoredTransactor( + 1, transaction.NewTransaction, s.mockGasPricer, + s.mockGasTracker, s.mockClient, big.NewInt(10), big.NewInt(15)) @@ -233,8 +248,10 @@ func (s *TransactorTestSuite) TestTransactor_MonitoredTransaction_MaxGasPriceRea func (s *TransactorTestSuite) TestTransactor_IncreaseGas_15PercentIncrease() { t := monitored.NewMonitoredTransactor( + 1, transaction.NewTransaction, s.mockGasPricer, + s.mockGasTracker, s.mockClient, big.NewInt(150), big.NewInt(15)) @@ -246,8 +263,10 @@ func (s *TransactorTestSuite) TestTransactor_IncreaseGas_15PercentIncrease() { func (s *TransactorTestSuite) TestTransactor_IncreaseGas_MaxGasReached() { t := monitored.NewMonitoredTransactor( + 1, transaction.NewTransaction, s.mockGasPricer, + s.mockGasTracker, s.mockClient, big.NewInt(15), big.NewInt(15)) diff --git a/mock/signAndSend.go b/mock/monitored.go similarity index 50% rename from mock/signAndSend.go rename to mock/monitored.go index a91de2db..4a2f5cc0 100644 --- a/mock/signAndSend.go +++ b/mock/monitored.go @@ -1,9 +1,9 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: chains/evm/transactor/signAndSend/signAndSend.go +// Source: chains/evm/transactor/monitored/monitored.go // // Generated by this command: // -// mockgen -source=chains/evm/transactor/signAndSend/signAndSend.go -destination=./mock/signAndSend.go -package mock +// mockgen -source=chains/evm/transactor/monitored/monitored.go -destination=./mock/monitored.go -package mock // // Package mock is a generated GoMock package. package mock @@ -52,3 +52,38 @@ func (mr *MockGasPricerMockRecorder) GasPrice(priority any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasPrice", reflect.TypeOf((*MockGasPricer)(nil).GasPrice), priority) } + +// MockGasTracker is a mock of GasTracker interface. +type MockGasTracker struct { + ctrl *gomock.Controller + recorder *MockGasTrackerMockRecorder +} + +// MockGasTrackerMockRecorder is the mock recorder for MockGasTracker. +type MockGasTrackerMockRecorder struct { + mock *MockGasTracker +} + +// NewMockGasTracker creates a new mock instance. +func NewMockGasTracker(ctrl *gomock.Controller) *MockGasTracker { + mock := &MockGasTracker{ctrl: ctrl} + mock.recorder = &MockGasTrackerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockGasTracker) EXPECT() *MockGasTrackerMockRecorder { + return m.recorder +} + +// TrackGasUsage mocks base method. +func (m *MockGasTracker) TrackGasUsage(domainID uint8, gasUsed uint64, gasPrice *big.Int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "TrackGasUsage", domainID, gasUsed, gasPrice) +} + +// TrackGasUsage indicates an expected call of TrackGasUsage. +func (mr *MockGasTrackerMockRecorder) TrackGasUsage(domainID, gasUsed, gasPrice any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrackGasUsage", reflect.TypeOf((*MockGasTracker)(nil).TrackGasUsage), domainID, gasUsed, gasPrice) +} diff --git a/observability/metrics.go b/observability/metrics.go index afe10b93..86413718 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -2,10 +2,7 @@ package observability import ( "context" - "math/big" "net/url" - "sync" - "time" "github.com/sygmaprotocol/sygma-core/observability/metrics" "go.opentelemetry.io/otel/attribute" @@ -13,23 +10,12 @@ import ( "go.opentelemetry.io/otel/metric" api "go.opentelemetry.io/otel/metric" sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" sdkresource "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" ) -func initResource() *sdkresource.Resource { - res, _ := sdkresource.New(context.Background(), - sdkresource.WithProcess(), - sdkresource.WithTelemetrySDK(), - sdkresource.WithHost(), - sdkresource.WithAttributes( - semconv.ServiceName("relayer"), - ), - ) - return res -} - -func InitMetricProvider(ctx context.Context, agentURL string) (*sdkmetric.MeterProvider, error) { +func InitMetricProvider(ctx context.Context, agentURL string, opts ...sdkmetric.Option) (*sdkmetric.MeterProvider, error) { collectorURL, err := url.Parse(agentURL) if err != nil { return nil, err @@ -50,68 +36,104 @@ func InitMetricProvider(ctx context.Context, agentURL string) (*sdkmetric.MeterP httpMetricReader := sdkmetric.NewPeriodicReader(metricHTTPExporter) + opts = append(opts, sdkmetric.WithReader(httpMetricReader)) + opts = append(opts, sdkmetric.WithResource(initResource())) + opts = append(opts, sdkmetric.WithView(initSecondView())) + opts = append(opts, sdkmetric.WithView(initGasView())) meterProvider := sdkmetric.NewMeterProvider( - sdkmetric.WithReader(httpMetricReader), - sdkmetric.WithResource(initResource()), + opts..., ) - return meterProvider, nil } -type RelayerMetrics struct { - *metrics.SystemMetrics +func initResource() *sdkresource.Resource { + res, _ := sdkresource.New(context.Background(), + sdkresource.WithProcess(), + sdkresource.WithTelemetrySDK(), + sdkresource.WithHost(), + sdkresource.WithAttributes( + semconv.ServiceName("relayer"), + ), + ) + return res +} - meter metric.Meter - Opts api.MeasurementOption +func initSecondView() sdkmetric.View { + return sdkmetric.NewView( + sdkmetric.Instrument{ + Unit: "s", + }, + sdkmetric.Stream{ + Aggregation: aggregation.ExplicitBucketHistogram{ + Boundaries: []float64{ + 0.000001, // 1 µs + 0.00001, // 10 µs + 0.0001, // 100 µs + 0.001, // 1 ms + 0.005, // 5 ms + 0.01, // 10 ms + 0.05, // 50 ms + 0.1, // 100 ms + 0.5, // 500 ms + 1.0, // 1 s + 5.0, // 5 s + 10.0, // 10 s + }, + NoMinMax: false, + }, + }, + ) +} - DepositEventCount metric.Int64Counter - MessageEventTime map[string]time.Time - ExecutionErrorCount metric.Int64Counter - ExecutionLatency metric.Int64Histogram - ExecutionLatencyPerRoute metric.Int64Histogram - BlockDelta metric.Int64ObservableGauge - BlockDeltaMap map[uint8]*big.Int +func initGasView() sdkmetric.View { + return sdkmetric.NewView( + sdkmetric.Instrument{ + Unit: "gas", + }, + sdkmetric.Stream{ + Aggregation: aggregation.ExplicitBucketHistogram{ + Boundaries: []float64{ + 10000, + 20000, + 50000, + 100000, + 500000, + 1000000, + 5000000, + 10000000, + 15000000, + 30000000, + }, + NoMinMax: false, + }, + }, + ) +} - lock sync.Mutex +type RelayerMetrics struct { + *metrics.SystemMetrics + *metrics.ChainMetrics + + Opts api.MeasurementOption } // NewRelayerMetrics initializes OpenTelemetry metrics func NewRelayerMetrics(ctx context.Context, meter metric.Meter, attributes ...attribute.KeyValue) (*RelayerMetrics, error) { opts := api.WithAttributes(attributes...) - blockDeltaMap := make(map[uint8]*big.Int) - blockDeltaGauge, _ := meter.Int64ObservableGauge( - "relayer.BlockDelta", - metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { - for domainID, delta := range blockDeltaMap { - result.Observe(delta.Int64(), - opts, - metric.WithAttributes(attribute.Int64("domainID", int64(domainID))), - ) - } - return nil - }), - metric.WithDescription("Difference between chain head and current indexed block per domain"), - ) - systemMetrics, err := metrics.NewSystemMetrics(ctx, meter, opts) if err != nil { return nil, err } + chainMetrics, err := metrics.NewChainMetrics(ctx, meter, opts) + if err != nil { + return nil, err + } + return &RelayerMetrics{ - SystemMetrics: systemMetrics, - meter: meter, - MessageEventTime: make(map[string]time.Time), - Opts: opts, - BlockDelta: blockDeltaGauge, - BlockDeltaMap: blockDeltaMap, + SystemMetrics: systemMetrics, + ChainMetrics: chainMetrics, + Opts: opts, }, err } - -func (t *RelayerMetrics) TrackBlockDelta(domainID uint8, head *big.Int, current *big.Int) { - t.lock.Lock() - defer t.lock.Unlock() - - t.BlockDeltaMap[domainID] = new(big.Int).Sub(head, current) -} diff --git a/observability/metrics/chain.go b/observability/metrics/chain.go new file mode 100644 index 00000000..2ad20f99 --- /dev/null +++ b/observability/metrics/chain.go @@ -0,0 +1,131 @@ +package metrics + +import ( + "context" + "math/big" + "sync" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type ChainMetrics struct { + opts metric.MeasurementOption + + blockDeltaGauge metric.Int64ObservableGauge + blockDeltaMap map[uint8]*big.Int + processedBlockMap map[uint8]*big.Int + processedBlockGauge metric.Int64ObservableGauge + chainHeadMap map[uint8]*big.Int + chainHeadGauge metric.Int64ObservableGauge + lock sync.Mutex + + gasUsedHistogram metric.Int64Histogram + gasPriceHistogram metric.Int64Histogram +} + +// NewChainMetrics initializes metrics that provide insight into chain processing and activity +func NewChainMetrics(ctx context.Context, meter metric.Meter, opts metric.MeasurementOption) (*ChainMetrics, error) { + blockDeltaMap := make(map[uint8]*big.Int) + blockDeltaGauge, err := meter.Int64ObservableGauge( + "relayer.BlockDelta", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + for domainID, delta := range blockDeltaMap { + result.Observe(delta.Int64(), + opts, + metric.WithAttributes(attribute.Int64("domainID", int64(domainID))), + ) + } + return nil + }), + metric.WithDescription("Difference between chain head and current indexed block per domain"), + ) + if err != nil { + return nil, err + } + + chainHeadMap := make(map[uint8]*big.Int) + chainHeadGauge, err := meter.Int64ObservableGauge( + "relayer.ChainHead", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + for domainID, head := range chainHeadMap { + result.Observe(head.Int64(), + opts, + metric.WithAttributes(attribute.Int64("domainID", int64(domainID))), + ) + } + return nil + }), + metric.WithDescription("Latest block of the chain."), + ) + if err != nil { + return nil, err + } + + processedBlockMap := make(map[uint8]*big.Int) + processedBlockGauge, err := meter.Int64ObservableGauge( + "relayer.ProcessedBlocks", + metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { + for domainID, block := range processedBlockMap { + result.Observe(block.Int64(), + opts, + metric.WithAttributes(attribute.Int64("domainID", int64(domainID))), + ) + } + return nil + }), + metric.WithDescription("Latest processed block."), + ) + if err != nil { + return nil, err + } + + gasUsedHistogram, err := meter.Int64Histogram( + "relayer.GasUsed", + metric.WithDescription("Gas used per transaction."), + metric.WithUnit("gas"), + ) + if err != nil { + return nil, err + } + + gasPriceHistogram, err := meter.Int64Histogram( + "relayer.GasPrice", + metric.WithDescription("Gas price distribution per transaction in gwei."), + ) + if err != nil { + return nil, err + } + + return &ChainMetrics{ + opts: opts, + blockDeltaMap: blockDeltaMap, + chainHeadMap: chainHeadMap, + blockDeltaGauge: blockDeltaGauge, + chainHeadGauge: chainHeadGauge, + processedBlockGauge: processedBlockGauge, + processedBlockMap: processedBlockMap, + gasUsedHistogram: gasUsedHistogram, + gasPriceHistogram: gasPriceHistogram, + }, nil +} + +func (m *ChainMetrics) TrackBlockDelta(domainID uint8, head *big.Int, current *big.Int) { + m.lock.Lock() + defer m.lock.Unlock() + + m.blockDeltaMap[domainID] = new(big.Int).Sub(head, current) + m.processedBlockMap[domainID] = new(big.Int).Set(current) + m.chainHeadMap[domainID] = new(big.Int).Set(head) +} + +func (m *ChainMetrics) TrackGasUsage(domainID uint8, gasUsed uint64, gasPrice *big.Int) { + m.gasPriceHistogram.Record( + context.Background(), + gasPrice.Int64(), + metric.WithAttributes(attribute.Int64("domainID", int64(domainID)))) + m.gasUsedHistogram.Record( + context.Background(), + int64(gasUsed), + metric.WithAttributes(attribute.Int64("domainID", int64(domainID)))) +} diff --git a/observability/metrics/system.go b/observability/metrics/system.go index 65208493..67c06a0e 100644 --- a/observability/metrics/system.go +++ b/observability/metrics/system.go @@ -164,6 +164,7 @@ func NewSystemMetrics(ctx context.Context, meter metric.Meter, opts metric.Measu gcDurationHistogram, err := meter.Float64Histogram( "relayer.GcDurationSeconds", metric.WithDescription("Duration of garbage collection cycles."), + metric.WithUnit("s"), ) if err != nil { return nil, err