diff --git a/core/capabilities/targets/monitoring.go b/core/capabilities/targets/monitoring.go new file mode 100644 index 00000000000..a34a946e351 --- /dev/null +++ b/core/capabilities/targets/monitoring.go @@ -0,0 +1,38 @@ +package targets + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" + + localMonitoring "github.com/smartcontractkit/chainlink/v2/core/monitoring" +) + +type writeTargetMetricsLabeler struct { + metrics.Labeler + chainWriterFailureCount metric.Int64Counter +} + +func newWriteTargetMetricsLabeler(labeler metrics.Labeler) (*writeTargetMetricsLabeler, error) { + chainWriterFailureCount, err := beholder.GetMeter().Int64Counter("write_target_failures_count") + if err != nil { + return nil, fmt.Errorf("failed to register write target failure counter: %w", err) + } + return &writeTargetMetricsLabeler{ + Labeler: labeler, + chainWriterFailureCount: chainWriterFailureCount, + }, nil +} + +func (l *writeTargetMetricsLabeler) with(keyValues ...string) *writeTargetMetricsLabeler { + return &writeTargetMetricsLabeler{l.With(keyValues...), l.chainWriterFailureCount} +} + +func (l *writeTargetMetricsLabeler) incrementChainWriterFailureCount(ctx context.Context) { + otelLabels := localMonitoring.KvMapToOtelAttributes(l.Labels) + l.chainWriterFailureCount.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} diff --git a/core/capabilities/targets/write_target.go b/core/capabilities/targets/write_target.go index 9315a1ee199..4f4929ef03a 100644 --- a/core/capabilities/targets/write_target.go +++ b/core/capabilities/targets/write_target.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types" @@ -37,6 +38,7 @@ type WriteTarget struct { receiverGasMinimum uint64 capabilities.CapabilityInfo + metrics *writeTargetMetricsLabeler emitter custmsg.MessageEmitter lggr logger.Logger @@ -69,12 +71,16 @@ func NewWriteTarget( cw commontypes.ChainWriter, forwarderAddress string, txGasLimit uint64, -) *WriteTarget { +) (*WriteTarget, error) { info := capabilities.MustNewCapabilityInfo( id, capabilities.CapabilityTypeTarget, "Write target.", ) + labeler, err := newWriteTargetMetricsLabeler(metrics.NewLabeler()) + if err != nil { + return nil, fmt.Errorf("failed to create write target metrics labeler: %w", err) + } return &WriteTarget{ cr, @@ -86,10 +92,11 @@ func NewWriteTarget( forwarderAddress, txGasLimit - ForwarderContractLogicGasCost, info, + labeler, custmsg.NewLabeler(), logger.Named(lggr, "WriteTarget"), false, - } + }, nil } // Note: This should be a shared type that the OCR3 package validates as well @@ -228,8 +235,15 @@ func (cap *WriteTarget) Execute(ctx context.Context, rawRequest capabilities.Cap return capabilities.CapabilityResponse{}, err } + wtMetrics := cap.metrics.with( + platform.KeyWorkflowID, request.Metadata.WorkflowID, + platform.KeyWorkflowName, request.Metadata.WorkflowName, + platform.KeyWorkflowOwner, request.Metadata.WorkflowOwner, + ) + rawExecutionID, err := hex.DecodeString(request.Metadata.WorkflowExecutionID) if err != nil { + wtMetrics.incrementChainWriterFailureCount(ctx) return capabilities.CapabilityResponse{}, err } @@ -245,6 +259,7 @@ func (cap *WriteTarget) Execute(ctx context.Context, rawRequest capabilities.Cap } var transmissionInfo TransmissionInfo if err = cap.cr.GetLatestValue(ctx, cap.binding.ReadIdentifier("getTransmissionInfo"), primitives.Unconfirmed, queryInputs, &transmissionInfo); err != nil { + wtMetrics.incrementChainWriterFailureCount(ctx) return capabilities.CapabilityResponse{}, fmt.Errorf("failed to getTransmissionInfo latest value: %w", err) } @@ -269,11 +284,13 @@ func (cap *WriteTarget) Execute(ctx context.Context, rawRequest capabilities.Cap cap.lggr.Infow("non-empty report - retrying a failed transmission - attempting to push to txmgr", "request", request, "reportLen", len(request.Inputs.SignedReport.Report), "reportContextLen", len(request.Inputs.SignedReport.Context), "nSignatures", len(request.Inputs.SignedReport.Signatures), "executionID", request.Metadata.WorkflowExecutionID, "receiverGasMinimum", receiverGasMinimum, "transmissionGasLimit", transmissionInfo.GasLimit) } default: + wtMetrics.incrementChainWriterFailureCount(ctx) return capabilities.CapabilityResponse{}, fmt.Errorf("unexpected transmission state: %v", transmissionInfo.State) } txID, err := uuid.NewUUID() // NOTE: CW expects us to generate an ID, rather than return one if err != nil { + wtMetrics.incrementChainWriterFailureCount(ctx) return capabilities.CapabilityResponse{}, err } @@ -308,10 +325,12 @@ func (cap *WriteTarget) Execute(ctx context.Context, rawRequest capabilities.Cap value := big.NewInt(0) if err := cap.cw.SubmitTransaction(ctx, "forwarder", "report", req, txID.String(), cap.forwarderAddress, &meta, value); err != nil { if !commontypes.ErrSettingTransactionGasLimitNotSupported.Is(err) { + wtMetrics.incrementChainWriterFailureCount(ctx) return capabilities.CapabilityResponse{}, fmt.Errorf("failed to submit transaction: %w", err) } meta.GasLimit = nil if err := cap.cw.SubmitTransaction(ctx, "forwarder", "report", req, txID.String(), cap.forwarderAddress, &meta, value); err != nil { + wtMetrics.incrementChainWriterFailureCount(ctx) return capabilities.CapabilityResponse{}, fmt.Errorf("failed to submit transaction: %w", err) } } @@ -346,6 +365,7 @@ func (cap *WriteTarget) Execute(ctx context.Context, rawRequest capabilities.Cap if err != nil { cap.lggr.Errorf("failed to send custom message with msg: %s, err: %v", msg, err) } + wtMetrics.incrementChainWriterFailureCount(ctx) return capabilities.CapabilityResponse{}, fmt.Errorf("submitted transaction failed: %w", err) default: cap.lggr.Debugw("Unexpected transaction status", "request", request, "transaction", txID, "status", txStatus) diff --git a/core/capabilities/targets/write_target_test.go b/core/capabilities/targets/write_target_test.go index 38136f07df0..e5ed628a177 100644 --- a/core/capabilities/targets/write_target_test.go +++ b/core/capabilities/targets/write_target_test.go @@ -33,7 +33,8 @@ func TestWriteTarget(t *testing.T) { forwarderA := testutils.NewAddress() forwarderAddr := forwarderA.Hex() - writeTarget := targets.NewWriteTarget(lggr, "test-write-target@1.0.0", cr, cw, forwarderAddr, 400_000) + writeTarget, err := targets.NewWriteTarget(lggr, "test-write-target@1.0.0", cr, cw, forwarderAddr, 400_000) + require.NoError(t, err) require.NotNil(t, writeTarget) config, err := values.NewMap(map[string]any{ diff --git a/core/services/relay/evm/write_target.go b/core/services/relay/evm/write_target.go index cd30e8ab3c3..ace564a385b 100644 --- a/core/services/relay/evm/write_target.go +++ b/core/services/relay/evm/write_target.go @@ -73,5 +73,10 @@ func NewWriteTarget(ctx context.Context, relayer *Relayer, chain legacyevm.Chain return nil, err } - return targets.NewWriteTarget(logger.Named(lggr, "WriteTarget"), id, cr, cw, config.ForwarderAddress().String(), gasLimitDefault), nil + wt, err := targets.NewWriteTarget(logger.Named(lggr, "WriteTarget"), id, cr, cw, config.ForwarderAddress().String(), gasLimitDefault) + if err != nil { + return nil, err + } + + return wt, nil }