From cbccdf70d2f37b77176cd50aa3e370dc7ed92ec2 Mon Sep 17 00:00:00 2001 From: Jim McDonald Date: Thu, 7 Dec 2023 22:57:26 +0000 Subject: [PATCH] Update dependencies. --- clients.go | 8 +- go.mod | 4 +- go.sum | 8 +- main.go | 77 ++++++++++------- .../beaconblockproposer/standard/propose.go | 83 ++----------------- .../standard/propose_test.go | 5 -- services/metrics/prometheus/accountmanager.go | 13 ++- services/metrics/prometheus/attestation.go | 38 +++++++-- .../prometheus/attestationaggregation.go | 45 ++++++++-- .../prometheus/beaconcommitteesubscription.go | 33 +++++++- services/metrics/prometheus/client.go | 36 +++++++- services/metrics/prometheus/controller.go | 19 ++++- services/metrics/prometheus/parameters.go | 17 ++-- services/metrics/prometheus/scheduler.go | 27 +++++- services/metrics/prometheus/service.go | 24 +++--- services/metrics/prometheus/service_test.go | 9 -- .../prometheus/synccommitteeaggregation.go | 45 ++++++++-- .../prometheus/synccommitteemessenger.go | 38 +++++++-- .../prometheus/synccommitteesubscriber.go | 26 +++++- util/commit.go | 29 +++++++ 20 files changed, 395 insertions(+), 189 deletions(-) create mode 100644 util/commit.go diff --git a/clients.go b/clients.go index 2b235139..d719d332 100644 --- a/clients.go +++ b/clients.go @@ -22,6 +22,7 @@ import ( eth2client "github.com/attestantio/go-eth2-client" httpclient "github.com/attestantio/go-eth2-client/http" multiclient "github.com/attestantio/go-eth2-client/multi" + "github.com/attestantio/vouch/services/metrics" "github.com/attestantio/vouch/util" "github.com/pkg/errors" ) @@ -32,7 +33,7 @@ var ( ) // fetchClient fetches a client service, instantiating it if required. -func fetchClient(ctx context.Context, address string) (eth2client.Service, error) { +func fetchClient(ctx context.Context, monitor metrics.Service, address string) (eth2client.Service, error) { if address == "" { return nil, errors.New("no address supplied for client") } @@ -45,6 +46,7 @@ func fetchClient(ctx context.Context, address string) (eth2client.Service, error var err error client, err = httpclient.New(ctx, httpclient.WithLogLevel(util.LogLevel(fmt.Sprintf("eth2client.%s", address))), + httpclient.WithMonitor(monitor), httpclient.WithTimeout(util.Timeout(fmt.Sprintf("eth2client.%s", address))), httpclient.WithAddress(address), httpclient.WithExtraHeaders(map[string]string{ @@ -63,7 +65,7 @@ func fetchClient(ctx context.Context, address string) (eth2client.Service, error } // fetchMulticlient fetches a multiclient service, instantiating it if required. -func fetchMultiClient(ctx context.Context, addresses []string) (eth2client.Service, error) { +func fetchMultiClient(ctx context.Context, monitor metrics.Service, addresses []string) (eth2client.Service, error) { if len(addresses) == 0 { return nil, errors.New("no addresses supplied for multiclient") } @@ -78,7 +80,7 @@ func fetchMultiClient(ctx context.Context, addresses []string) (eth2client.Servi // Fetch or create the individual clients. clients := make([]eth2client.Service, 0, len(addresses)) for _, address := range addresses { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { log.Error().Err(err).Str("address", address).Msg("Cannot access client for multiclient; dropping from list") continue diff --git a/go.mod b/go.mod index 62db6f16..bdd58c6d 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.20 require ( github.com/attestantio/go-block-relay v0.2.0 - github.com/attestantio/go-builder-client v0.4.1 - github.com/attestantio/go-eth2-client v0.19.5 + github.com/attestantio/go-builder-client v0.4.2 + github.com/attestantio/go-eth2-client v0.19.7 github.com/aws/aws-sdk-go v1.46.4 github.com/holiman/uint256 v1.2.3 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 22f22b72..8c5965ae 100644 --- a/go.sum +++ b/go.sum @@ -76,10 +76,10 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/attestantio/go-block-relay v0.2.0 h1:RiIDW6nLAfTYsD7bQ4W3MJVDlTIifyDUsHQvm5HWF3k= github.com/attestantio/go-block-relay v0.2.0/go.mod h1:6pjIGKOdIwP1AF0+TFF9kBVS1CueHec5sLSZX4U9XBg= -github.com/attestantio/go-builder-client v0.4.1 h1:1F4JVW9ElebH8cVhxs3jFSaWHy4NXGCePdUM7em/fLs= -github.com/attestantio/go-builder-client v0.4.1/go.mod h1:3u2Y8lHSPFaNJSRG1QFQsh11w+x7+5SLBNM6ajbqyxU= -github.com/attestantio/go-eth2-client v0.19.5 h1:4V+vhXsCYji5jWrlONbr03GV7qoLRdzq96dLgXaqmek= -github.com/attestantio/go-eth2-client v0.19.5/go.mod h1:mZve1kV9Ctj0I1HH9gdg+MnI8lZ+Cb2EktEtOYrBlsM= +github.com/attestantio/go-builder-client v0.4.2 h1:EycfAFqQV+ooc2z6hmTsbuH4TCLknr0aO0nHLHLMpJM= +github.com/attestantio/go-builder-client v0.4.2/go.mod h1:e02i/WO4fjs3/u9oIZEjiC8CK1Qyxy4cpiMMGKx4VqQ= +github.com/attestantio/go-eth2-client v0.19.7 h1:1cX2rYz9tMZGhXTCe5Ax3C9fmHx1igih21+MU1eO5ls= +github.com/attestantio/go-eth2-client v0.19.7/go.mod h1:mZve1kV9Ctj0I1HH9gdg+MnI8lZ+Cb2EktEtOYrBlsM= github.com/aws/aws-sdk-go v1.44.81/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.46.4 h1:48tKgtm9VMPkb6y7HuYlsfhQmoIRAsTEXTsWLVlty4M= github.com/aws/aws-sdk-go v1.46.4/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= diff --git a/main.go b/main.go index cbc01c4c..87e55c73 100644 --- a/main.go +++ b/main.go @@ -142,7 +142,7 @@ func main2() int { } logModules() - log.Info().Str("version", ReleaseVersion).Msg("Starting vouch") + log.Info().Str("version", ReleaseVersion).Str("commit_hash", util.CommitHash()).Msg("Starting vouch") initProfiling() @@ -277,14 +277,14 @@ func initProfiling() { } } -func startClient(ctx context.Context) (eth2client.Service, error) { +func startClient(ctx context.Context, monitor metrics.Service) (eth2client.Service, error) { log.Trace().Msg("Starting consensus client service") var consensusClient eth2client.Service var err error if len(viper.GetStringSlice("beacon-node-addresses")) > 0 { - consensusClient, err = fetchMultiClient(ctx, viper.GetStringSlice("beacon-node-addresses")) + consensusClient, err = fetchMultiClient(ctx, monitor, viper.GetStringSlice("beacon-node-addresses")) } else { - consensusClient, err = fetchClient(ctx, viper.GetString("beacon-node-address")) + consensusClient, err = fetchClient(ctx, monitor, viper.GetString("beacon-node-address")) } if err != nil { return nil, err @@ -392,7 +392,7 @@ func startServices(ctx context.Context, default: eventsBeaconNodeAddresses = util.BeaconNodeAddresses("strategies.attestationdata") } - eventsConsensusClient, err := fetchMultiClient(ctx, eventsBeaconNodeAddresses) + eventsConsensusClient, err := fetchMultiClient(ctx, monitor, eventsBeaconNodeAddresses) if err != nil { return nil, nil, errors.Wrap(err, "failed to fetch multiclient for controller") } @@ -442,7 +442,14 @@ func startBasicServices(ctx context.Context, metrics.Service, error, ) { - eth2Client, err := startClient(ctx) + // Initialise monitor without chainTime service and server for now, so the + // client can provide metrics. + monitor, err := startMonitor(ctx, nil, false) + if err != nil { + return nil, nil, nil, errors.Wrap(err, "failed to start metrics service") + } + + eth2Client, err := startClient(ctx, monitor) if err != nil { return nil, nil, nil, err } @@ -457,7 +464,8 @@ func startBasicServices(ctx context.Context, } log.Trace().Msg("Starting metrics service") - monitor, err := startMonitor(ctx, chainTime) + // Reinitialise monitor with chainTime service and an operational server. + monitor, err = startMonitor(ctx, chainTime, true) if err != nil { return nil, nil, nil, errors.Wrap(err, "failed to start metrics service") } @@ -830,15 +838,22 @@ func initMajordomo(ctx context.Context) (majordomo.Service, error) { } // startMonitor starts the relevant metrics monitor given user input. -func startMonitor(ctx context.Context, chainTime chaintime.Service) (metrics.Service, error) { +func startMonitor(ctx context.Context, + chainTime chaintime.Service, + createServer bool, +) ( + metrics.Service, + error, +) { log.Trace().Msg("Starting metrics service") var monitor metrics.Service - if viper.Get("metrics.prometheus.listen-address") != "" { + if viper.Get("metrics.prometheus.listen-address") != nil { var err error monitor, err = prometheusmetrics.New(ctx, prometheusmetrics.WithLogLevel(util.LogLevel("metrics.prometheus")), prometheusmetrics.WithAddress(viper.GetString("metrics.prometheus.listen-address")), prometheusmetrics.WithChainTime(chainTime), + prometheusmetrics.WithCreateServer(createServer), ) if err != nil { return nil, errors.Wrap(err, "failed to start prometheus metrics service") @@ -1047,7 +1062,7 @@ func selectAttestationDataProvider(ctx context.Context, log.Info().Msg("Starting best attestation data strategy") attestationDataProviders := make(map[string]eth2client.AttestationDataProvider) for _, address := range util.BeaconNodeAddresses("strategies.attestationdata.best") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for attestation data strategy", address)) } @@ -1069,7 +1084,7 @@ func selectAttestationDataProvider(ctx context.Context, log.Info().Msg("Starting first attestation data strategy") attestationDataProviders := make(map[string]eth2client.AttestationDataProvider) for _, address := range util.BeaconNodeAddresses("strategies.attestationdata.first") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for attestation data strategy", address)) } @@ -1107,7 +1122,7 @@ func selectAggregateAttestationProvider(ctx context.Context, log.Info().Msg("Starting best aggregate attestation strategy") aggregateAttestationProviders := make(map[string]eth2client.AggregateAttestationProvider) for _, address := range util.BeaconNodeAddresses("strategies.aggregateattestation.best") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for aggregate attestation strategy", address)) } @@ -1127,7 +1142,7 @@ func selectAggregateAttestationProvider(ctx context.Context, log.Info().Msg("Starting first aggregate attestation strategy") aggregateAttestationProviders := make(map[string]eth2client.AggregateAttestationProvider) for _, address := range util.BeaconNodeAddresses("strategies.aggregateattestation.first") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for aggregate attestation strategy", address)) } @@ -1164,7 +1179,7 @@ func selectProposalProvider(ctx context.Context, log.Info().Msg("Starting best beacon block proposal strategy") proposalProviders := make(map[string]eth2client.ProposalProvider) for _, address := range util.BeaconNodeAddresses("strategies.beaconblockproposal.best") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for beacon block proposal strategy", address)) } @@ -1190,7 +1205,7 @@ func selectProposalProvider(ctx context.Context, log.Info().Msg("Starting first beacon block proposal strategy") proposalProviders := make(map[string]eth2client.ProposalProvider) for _, address := range util.BeaconNodeAddresses("strategies.beaconblockproposal.first") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for beacon block proposal strategy", address)) } @@ -1227,7 +1242,7 @@ func selectBlindedProposalProvider(ctx context.Context, log.Info().Msg("Starting best blinded beacon block proposal strategy") blindedProposalProviders := make(map[string]eth2client.BlindedProposalProvider) for _, address := range util.BeaconNodeAddresses("strategies.blindedbeaconblockproposal.best") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for blinded beacon block proposal strategy", address)) } @@ -1252,7 +1267,7 @@ func selectBlindedProposalProvider(ctx context.Context, log.Info().Msg("Starting first blinded beacon block proposal strategy") blindedProposalProviders := make(map[string]eth2client.BlindedProposalProvider) for _, address := range util.BeaconNodeAddresses("strategies.blindedbeaconblockproposal.first") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for blinded beacon block proposal strategy", address)) } @@ -1288,7 +1303,7 @@ func selectSyncCommitteeContributionProvider(ctx context.Context, log.Info().Msg("Starting best sync committee contribution strategy") syncCommitteeContributionProviders := make(map[string]eth2client.SyncCommitteeContributionProvider) for _, address := range util.BeaconNodeAddresses("strategies.synccommitteecontribution.best") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for sync committee contribution strategy", address)) } @@ -1308,7 +1323,7 @@ func selectSyncCommitteeContributionProvider(ctx context.Context, log.Info().Msg("Starting first sync committee contribution strategy") syncCommitteeContributionProviders := make(map[string]eth2client.SyncCommitteeContributionProvider) for _, address := range util.BeaconNodeAddresses("strategies.synccommitteecontribution.first") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for sync committee contribution strategy", address)) } @@ -1344,7 +1359,7 @@ func selectBeaconBlockRootProvider(ctx context.Context, log.Info().Msg("Starting majority beacon block root strategy") beaconBlockRootProviders := make(map[string]eth2client.BeaconBlockRootProvider) for _, address := range util.BeaconNodeAddresses("strategies.beaconblockroot.majority") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for beacon block root strategy", address)) } @@ -1366,7 +1381,7 @@ func selectBeaconBlockRootProvider(ctx context.Context, log.Info().Msg("Starting first beacon block root strategy") beaconBlockRootProviders := make(map[string]eth2client.BeaconBlockRootProvider) for _, address := range util.BeaconNodeAddresses("strategies.beaconblockroot.first") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for beacon block root strategy", address)) } @@ -1428,7 +1443,7 @@ func startMultinodeSubmitter(ctx context.Context, ) { aggregateAttestationSubmitters := make(map[string]eth2client.AggregateAttestationsSubmitter) for _, address := range util.BeaconNodeAddresses("submitter.aggregateattestation.multinode") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for aggregate attestation submitter strategy", address)) } @@ -1437,7 +1452,7 @@ func startMultinodeSubmitter(ctx context.Context, attestationsSubmitters := make(map[string]eth2client.AttestationsSubmitter) for _, address := range util.BeaconNodeAddresses("submitter.attestation.multinode") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for attestation submitter strategy", address)) } @@ -1446,7 +1461,7 @@ func startMultinodeSubmitter(ctx context.Context, proposalSubmitters := make(map[string]eth2client.ProposalSubmitter) for _, address := range util.BeaconNodeAddresses("submitter.proposal.multinode") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for proposal submitter strategy", address)) } @@ -1455,7 +1470,7 @@ func startMultinodeSubmitter(ctx context.Context, beaconCommitteeSubscriptionsSubmitters := make(map[string]eth2client.BeaconCommitteeSubscriptionsSubmitter) for _, address := range util.BeaconNodeAddresses("submitter.beaconcommitteesubscription.multinode") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for beacon committee subscription submitter strategy", address)) } @@ -1464,7 +1479,7 @@ func startMultinodeSubmitter(ctx context.Context, proposalPreparationSubmitters := make(map[string]eth2client.ProposalPreparationsSubmitter) for _, address := range util.BeaconNodeAddresses("submitter.proposalpreparation.multinode") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for proposal preparation submitter strategy", address)) } @@ -1473,7 +1488,7 @@ func startMultinodeSubmitter(ctx context.Context, syncCommitteeContributionsSubmitters := make(map[string]eth2client.SyncCommitteeContributionsSubmitter) for _, address := range util.BeaconNodeAddresses("submitter.synccommitteecontribution.multinode") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for sync committee contribution submitter strategy", address)) } @@ -1482,7 +1497,7 @@ func startMultinodeSubmitter(ctx context.Context, syncCommitteeMessagesSubmitters := make(map[string]eth2client.SyncCommitteeMessagesSubmitter) for _, address := range util.BeaconNodeAddresses("submitter.synccommitteemessage.multinode") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for sync committee message submitter strategy", address)) } @@ -1491,7 +1506,7 @@ func startMultinodeSubmitter(ctx context.Context, syncCommitteeSubscriptionsSubmitters := make(map[string]eth2client.SyncCommitteeSubscriptionsSubmitter) for _, address := range util.BeaconNodeAddresses("submitter.synccommitteesubscription.multinode") { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for sync committee subscription submitter strategy", address)) } @@ -1596,7 +1611,7 @@ func startBlockRelay(ctx context.Context, secondaryValidatorRegistrationsSubmitters := make([]eth2client.ValidatorRegistrationsSubmitter, 0, len(bestBeaconNodeAddresses)+len(firstBeaconNodeAddresses)) clients := make(map[string]struct{}) for _, address := range bestBeaconNodeAddresses { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for blinded beacon block proposal strategy", address)) } @@ -1605,7 +1620,7 @@ func startBlockRelay(ctx context.Context, } for _, address := range firstBeaconNodeAddresses { if _, exists := clients[address]; !exists { - client, err := fetchClient(ctx, address) + client, err := fetchClient(ctx, monitor, address) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for blinded beacon block proposal strategy", address)) } diff --git a/services/beaconblockproposer/standard/propose.go b/services/beaconblockproposer/standard/propose.go index 7a4eaeff..9b3396f8 100644 --- a/services/beaconblockproposer/standard/propose.go +++ b/services/beaconblockproposer/standard/propose.go @@ -246,7 +246,7 @@ func (s *Service) proposeBlockWithAuction(ctx context.Context, for _, provider := range auctionResults.Providers { unblindedProposalProvider, isProvider := provider.(builderclient.UnblindedProposalProvider) if !isProvider { - log.Warn().Msg("Auctioneer cannot unblind the proposal") + log.Warn().Str("provider", provider.Name()).Msg("Auctioneer cannot unblind the proposal") continue } providers = append(providers, unblindedProposalProvider) @@ -345,29 +345,6 @@ func (s *Service) signProposalData(ctx context.Context, ) ( *api.VersionedSignedProposal, error, -) { - var signedProposal *api.VersionedSignedProposal - var err error - - switch proposal.Version { - case spec.DataVersionPhase0, spec.DataVersionAltair, spec.DataVersionBellatrix, spec.DataVersionCapella: - signedProposal, err = s.signSimpleProposal(ctx, proposal, duty) - case spec.DataVersionDeneb: - signedProposal, err = s.signCompositeProposal(ctx, proposal, duty) - } - if err != nil { - return nil, err - } - - return signedProposal, nil -} - -func (s *Service) signSimpleProposal(ctx context.Context, - proposal *api.VersionedProposal, - duty *beaconblockproposer.Duty, -) ( - *api.VersionedSignedProposal, - error, ) { bodyRoot, err := proposal.BodyRoot() if err != nil { @@ -420,60 +397,17 @@ func (s *Service) signSimpleProposal(ctx context.Context, Message: proposal.Capella, Signature: sig, } - default: - return nil, errors.New("unhandled proposal version") - } - - return signedProposal, nil -} - -func (s *Service) signCompositeProposal(ctx context.Context, - proposal *api.VersionedProposal, - duty *beaconblockproposer.Duty, -) ( - *api.VersionedSignedProposal, - error, -) { - bodyRoot, err := proposal.BodyRoot() - if err != nil { - return nil, errors.Wrap(err, "failed to calculate hash tree root of proposal body") - } - - parentRoot, err := proposal.ParentRoot() - if err != nil { - return nil, errors.Wrap(err, "failed to obtain parent root of proposal") - } - - stateRoot, err := proposal.StateRoot() - if err != nil { - return nil, errors.Wrap(err, "failed to obtain state root of proposal") - } - - sig, err := s.beaconBlockSigner.SignBeaconBlockProposal(ctx, - duty.Account(), - duty.Slot(), - duty.ValidatorIndex(), - parentRoot, - stateRoot, - bodyRoot) - if err != nil { - return nil, errors.Wrap(err, "failed to sign beacon proposal") - } - - signedProposal := &api.VersionedSignedProposal{ - Version: proposal.Version, - } - - switch proposal.Version { case spec.DataVersionDeneb: signedProposal.Deneb = &apiv1deneb.SignedBlockContents{ SignedBlock: &deneb.SignedBeaconBlock{ Message: proposal.Deneb.Block, Signature: sig, }, + KZGProofs: proposal.Deneb.KZGProofs, + Blobs: proposal.Deneb.Blobs, } default: - return nil, errors.New("unhandled composite proposal version") + return nil, errors.New("unhandled proposal version") } return signedProposal, nil @@ -654,11 +588,10 @@ func (s *Service) signBlindedProposal(ctx context.Context, Signature: sig, } case spec.DataVersionDeneb: - return nil, errors.New("unsupported proposal version deneb") - // signedProposal.Deneb = &apiv1deneb.SignedBlindedBeaconBlock{ - // Message: proposal.Deneb, - // Signature: sig, - // } + signedProposal.Deneb = &apiv1deneb.SignedBlindedBeaconBlock{ + Message: proposal.Deneb, + Signature: sig, + } default: return nil, fmt.Errorf("unknown proposal version %v", signedProposal.Version) } diff --git a/services/beaconblockproposer/standard/propose_test.go b/services/beaconblockproposer/standard/propose_test.go index a74a2a84..6505e9f3 100644 --- a/services/beaconblockproposer/standard/propose_test.go +++ b/services/beaconblockproposer/standard/propose_test.go @@ -15,7 +15,6 @@ package standard_test import ( "context" - "fmt" "testing" "time" @@ -148,10 +147,6 @@ func TestPropose(t *testing.T) { s.Propose(ctx, test.data) - // TODO remove. - for _, entry := range capture.Entries() { - fmt.Printf("%v\n", entry) - } for _, err := range test.errs { require.True(t, capture.HasLog(err)) } diff --git a/services/metrics/prometheus/accountmanager.go b/services/metrics/prometheus/accountmanager.go index 1791de89..b5230235 100644 --- a/services/metrics/prometheus/accountmanager.go +++ b/services/metrics/prometheus/accountmanager.go @@ -14,6 +14,8 @@ package prometheus import ( + "errors" + "github.com/prometheus/client_golang/prometheus" ) @@ -24,7 +26,16 @@ func (s *Service) setupAccountManagerMetrics() error { Name: "accounts_total", Help: "The number of accounts managed by Vouch.", }, []string{"state"}) - return prometheus.Register(s.accountManagerAccounts) + if err := prometheus.Register(s.accountManagerAccounts); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.accountManagerAccounts = alreadyRegisteredError.ExistingCollector.(*prometheus.GaugeVec) + } else { + return err + } + } + + return nil } // Accounts sets the number of accounts in a given state. diff --git a/services/metrics/prometheus/attestation.go b/services/metrics/prometheus/attestation.go index 90fe8070..d6ba2599 100644 --- a/services/metrics/prometheus/attestation.go +++ b/services/metrics/prometheus/attestation.go @@ -14,6 +14,7 @@ package prometheus import ( + "errors" "time" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -32,7 +33,12 @@ func (s *Service) setupAttestationMetrics() error { }, }) if err := prometheus.Register(s.attestationProcessTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.attestationProcessTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.attestationMarkTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -56,7 +62,12 @@ func (s *Service) setupAttestationMetrics() error { }, }) if err := prometheus.Register(s.attestationMarkTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.attestationMarkTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.attestationProcessLatestSlot = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -66,7 +77,12 @@ func (s *Service) setupAttestationMetrics() error { Help: "The latest slot for which Vouch attested.", }) if err := prometheus.Register(s.attestationProcessLatestSlot); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.attestationProcessLatestSlot = alreadyRegisteredError.ExistingCollector.(prometheus.Gauge) + } else { + return err + } } s.attestationProcessRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -75,7 +91,16 @@ func (s *Service) setupAttestationMetrics() error { Name: "requests_total", Help: "The number of attestation processes.", }, []string{"result"}) - return prometheus.Register(s.attestationProcessRequests) + if err := prometheus.Register(s.attestationProcessRequests); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.attestationProcessRequests = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } + } + + return nil } // AttestationsCompleted is called when an attestation process has completed. @@ -86,8 +111,9 @@ func (s *Service) AttestationsCompleted(started time.Time, slot phase0.Slot, cou for i := 0; i < count; i++ { s.attestationProcessTimer.Observe(duration) } - secsSinceStartOfSlot := time.Since(s.chainTime.StartOfSlot(slot)).Seconds() - s.attestationMarkTimer.Observe(secsSinceStartOfSlot) + if s.chainTime != nil { + s.attestationMarkTimer.Observe(time.Since(s.chainTime.StartOfSlot(slot)).Seconds()) + } s.attestationProcessLatestSlot.Set(float64(slot)) } s.attestationProcessRequests.WithLabelValues(result).Add(float64(count)) diff --git a/services/metrics/prometheus/attestationaggregation.go b/services/metrics/prometheus/attestationaggregation.go index 03fb0fee..f0900205 100644 --- a/services/metrics/prometheus/attestationaggregation.go +++ b/services/metrics/prometheus/attestationaggregation.go @@ -14,6 +14,7 @@ package prometheus import ( + "errors" "time" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -32,7 +33,12 @@ func (s *Service) setupAttestationAggregationMetrics() error { }, }) if err := prometheus.Register(s.attestationAggregationProcessTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.attestationAggregationProcessTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.attestationAggregationMarkTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -56,7 +62,12 @@ func (s *Service) setupAttestationAggregationMetrics() error { }, }) if err := prometheus.Register(s.attestationAggregationMarkTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.attestationAggregationMarkTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.attestationAggregationProcessLatestSlot = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -66,7 +77,12 @@ func (s *Service) setupAttestationAggregationMetrics() error { Help: "The latest slot for which Vouch produced an aggregate attestation.", }) if err := prometheus.Register(s.attestationAggregationProcessLatestSlot); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.attestationAggregationProcessLatestSlot = alreadyRegisteredError.ExistingCollector.(prometheus.Gauge) + } else { + return err + } } s.attestationAggregationProcessRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -76,7 +92,12 @@ func (s *Service) setupAttestationAggregationMetrics() error { Help: "The number of beacon block attestation aggregation processes.", }, []string{"result"}) if err := prometheus.Register(s.attestationAggregationProcessRequests); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.attestationAggregationProcessRequests = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } } s.attestationAggregationCoverageRatio = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -86,7 +107,16 @@ func (s *Service) setupAttestationAggregationMetrics() error { Help: "The ratio of included to possible attestations in the aggregate.", Buckets: []float64{0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, }) - return prometheus.Register(s.attestationAggregationCoverageRatio) + if err := prometheus.Register(s.attestationAggregationCoverageRatio); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.attestationAggregationCoverageRatio = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } + } + + return nil } // AttestationAggregationCompleted is called when an attestation aggregationprocess has completed. @@ -94,8 +124,9 @@ func (s *Service) AttestationAggregationCompleted(started time.Time, slot phase0 // Only log times for successful completions. if result == "succeeded" { s.attestationAggregationProcessTimer.Observe(time.Since(started).Seconds()) - secsSinceStartOfSlot := time.Since(s.chainTime.StartOfSlot(slot)).Seconds() - s.attestationAggregationMarkTimer.Observe(secsSinceStartOfSlot) + if s.chainTime != nil { + s.attestationAggregationMarkTimer.Observe(time.Since(s.chainTime.StartOfSlot(slot)).Seconds()) + } s.attestationAggregationProcessLatestSlot.Set(float64(slot)) } s.attestationAggregationProcessRequests.WithLabelValues(result).Inc() diff --git a/services/metrics/prometheus/beaconcommitteesubscription.go b/services/metrics/prometheus/beaconcommitteesubscription.go index 8ca3be3c..abcac5a9 100644 --- a/services/metrics/prometheus/beaconcommitteesubscription.go +++ b/services/metrics/prometheus/beaconcommitteesubscription.go @@ -14,6 +14,7 @@ package prometheus import ( + "errors" "time" "github.com/prometheus/client_golang/prometheus" @@ -31,7 +32,12 @@ func (s *Service) setupBeaconCommitteeSubscriptionMetrics() error { }, }) if err := prometheus.Register(s.beaconCommitteeSubscriptionProcessTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.beaconCommitteeSubscriptionProcessTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.beaconCommitteeSubscriptionProcessRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -41,7 +47,12 @@ func (s *Service) setupBeaconCommitteeSubscriptionMetrics() error { Help: "The number of beacon committee subscription processes.", }, []string{"result"}) if err := prometheus.Register(s.beaconCommitteeSubscriptionProcessRequests); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.beaconCommitteeSubscriptionProcessRequests = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } } s.beaconCommitteeSubscribers = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -51,7 +62,12 @@ func (s *Service) setupBeaconCommitteeSubscriptionMetrics() error { Help: "The number of beacon committee subscribed.", }) if err := prometheus.Register(s.beaconCommitteeSubscribers); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.beaconCommitteeSubscribers = alreadyRegisteredError.ExistingCollector.(prometheus.Gauge) + } else { + return err + } } s.beaconCommitteeAggregators = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -60,7 +76,16 @@ func (s *Service) setupBeaconCommitteeSubscriptionMetrics() error { Name: "aggregators_total", Help: "The number of beacon committee aggregated.", }) - return prometheus.Register(s.beaconCommitteeAggregators) + if err := prometheus.Register(s.beaconCommitteeAggregators); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.beaconCommitteeAggregators = alreadyRegisteredError.ExistingCollector.(prometheus.Gauge) + } else { + return err + } + } + + return nil } // BeaconCommitteeSubscriptionCompleted is called when an beacon committee subscription process has completed. diff --git a/services/metrics/prometheus/client.go b/services/metrics/prometheus/client.go index 4350db8f..4a61c123 100644 --- a/services/metrics/prometheus/client.go +++ b/services/metrics/prometheus/client.go @@ -14,6 +14,7 @@ package prometheus import ( + "errors" "time" "github.com/prometheus/client_golang/prometheus" @@ -26,8 +27,14 @@ func (s *Service) setupClientMetrics() error { Name: "requests_total", }, []string{"provider", "operation", "result"}) if err := prometheus.Register(s.clientOperationCounter); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.clientOperationCounter = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } } + s.clientOperationTimer = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "vouch", Subsystem: "client_operation", @@ -41,8 +48,14 @@ func (s *Service) setupClientMetrics() error { }, }, []string{"provider", "operation"}) if err := prometheus.Register(s.clientOperationTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.clientOperationTimer = alreadyRegisteredError.ExistingCollector.(*prometheus.HistogramVec) + } else { + return err + } } + s.strategyOperationCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "vouch", Subsystem: "strategy_operation", @@ -50,8 +63,14 @@ func (s *Service) setupClientMetrics() error { Help: "The results used by a strategy.", }, []string{"strategy", "provider", "operation"}) if err := prometheus.Register(s.strategyOperationCounter); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.strategyOperationCounter = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } } + s.strategyOperationTimer = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "vouch", Subsystem: "strategy_operation", @@ -64,7 +83,16 @@ func (s *Service) setupClientMetrics() error { 3.1, 3.2, 3.3, 3.4, 3.5, 3.6, 3.7, 3.8, 3.9, 4.0, }, }, []string{"strategy", "provider", "operation"}) - return prometheus.Register(s.strategyOperationTimer) + if err := prometheus.Register(s.strategyOperationTimer); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.strategyOperationTimer = alreadyRegisteredError.ExistingCollector.(*prometheus.HistogramVec) + } else { + return err + } + } + + return nil } // ClientOperation registers an operation. diff --git a/services/metrics/prometheus/controller.go b/services/metrics/prometheus/controller.go index b7bad0f7..931d6581 100644 --- a/services/metrics/prometheus/controller.go +++ b/services/metrics/prometheus/controller.go @@ -14,6 +14,7 @@ package prometheus import ( + "errors" "fmt" "time" @@ -27,7 +28,12 @@ func (s *Service) setupControllerMetrics() error { Help: "The number of epochs vouch has processed.", }) if err := prometheus.Register(s.epochsProcessed); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.epochsProcessed = alreadyRegisteredError.ExistingCollector.(prometheus.Counter) + } else { + return err + } } s.blockReceiptDelay = prometheus.NewHistogramVec(prometheus.HistogramOpts{ @@ -49,7 +55,16 @@ func (s *Service) setupControllerMetrics() error { 11.1, 11.2, 11.3, 11.4, 11.5, 11.6, 11.7, 11.8, 11.9, 12.0, }, }, []string{"epoch_slot"}) - return prometheus.Register(s.blockReceiptDelay) + if err := prometheus.Register(s.blockReceiptDelay); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.blockReceiptDelay = alreadyRegisteredError.ExistingCollector.(*prometheus.HistogramVec) + } else { + return err + } + } + + return nil } // NewEpoch is called when vouch starts processing a new epoch. diff --git a/services/metrics/prometheus/parameters.go b/services/metrics/prometheus/parameters.go index 8a9acdf9..9cc89854 100644 --- a/services/metrics/prometheus/parameters.go +++ b/services/metrics/prometheus/parameters.go @@ -21,9 +21,10 @@ import ( ) type parameters struct { - logLevel zerolog.Level - address string - chainTime chaintime.Service + logLevel zerolog.Level + address string + chainTime chaintime.Service + createServer bool } // Parameter is the interface for service parameters. @@ -58,6 +59,13 @@ func WithChainTime(chainTime chaintime.Service) Parameter { }) } +// WithCreateServer creates a web server for metrics if true. +func WithCreateServer(createServer bool) Parameter { + return parameterFunc(func(p *parameters) { + p.createServer = createServer + }) +} + // parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct. func parseAndCheckParameters(params ...Parameter) (*parameters, error) { parameters := parameters{ @@ -72,9 +80,6 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) { if parameters.address == "" { return nil, errors.New("no address specified") } - if parameters.chainTime == nil { - return nil, errors.New("no chain time service specified") - } return ¶meters, nil } diff --git a/services/metrics/prometheus/scheduler.go b/services/metrics/prometheus/scheduler.go index ca16830b..36210047 100644 --- a/services/metrics/prometheus/scheduler.go +++ b/services/metrics/prometheus/scheduler.go @@ -14,6 +14,8 @@ package prometheus import ( + "errors" + "github.com/prometheus/client_golang/prometheus" ) @@ -25,7 +27,12 @@ func (s *Service) setupSchedulerMetrics() error { Help: "The number of jobs scheduled.", }, []string{"class"}) if err := prometheus.Register(s.schedulerJobsScheduled); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.schedulerJobsScheduled = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } } s.schedulerJobsCancelled = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -35,7 +42,12 @@ func (s *Service) setupSchedulerMetrics() error { Help: "The number of scheduled jobs cancelled.", }, []string{"class"}) if err := prometheus.Register(s.schedulerJobsCancelled); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.schedulerJobsCancelled = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } } s.schedulerJobsStarted = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -44,7 +56,16 @@ func (s *Service) setupSchedulerMetrics() error { Name: "jobs_started_total", Help: "The number of scheduled jobs started.", }, []string{"class", "trigger"}) - return prometheus.Register(s.schedulerJobsStarted) + if err := prometheus.Register(s.schedulerJobsStarted); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.schedulerJobsStarted = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } + } + + return nil } // JobScheduled is called when a job is scheduled. diff --git a/services/metrics/prometheus/service.go b/services/metrics/prometheus/service.go index f7f7af4d..ef19b93b 100644 --- a/services/metrics/prometheus/service.go +++ b/services/metrics/prometheus/service.go @@ -1,4 +1,4 @@ -// Copyright © 2020 - 2022 Attestant Limited. +// Copyright © 2020 - 2023 Attestant Limited. // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -127,16 +127,18 @@ func New(_ context.Context, params ...Parameter) (*Service, error) { return nil, errors.Wrap(err, "failed to set up client metrics") } - go func() { - http.Handle("/metrics", promhttp.Handler()) - server := &http.Server{ - Addr: parameters.address, - ReadHeaderTimeout: 5 * time.Second, - } - if err := server.ListenAndServe(); err != nil { - log.Warn().Str("metrics_address", parameters.address).Err(err).Msg("Failed to run metrics server") - } - }() + if parameters.createServer { + go func() { + http.Handle("/metrics", promhttp.Handler()) + server := &http.Server{ + Addr: parameters.address, + ReadHeaderTimeout: 5 * time.Second, + } + if err := server.ListenAndServe(); err != nil { + log.Warn().Str("metrics_address", parameters.address).Err(err).Msg("Failed to run metrics server") + } + }() + } return s, nil } diff --git a/services/metrics/prometheus/service_test.go b/services/metrics/prometheus/service_test.go index 1fef4d96..6015cafc 100644 --- a/services/metrics/prometheus/service_test.go +++ b/services/metrics/prometheus/service_test.go @@ -53,20 +53,11 @@ func TestService(t *testing.T) { }, err: "problem with parameters: no address specified", }, - { - name: "ChainTimeMissing", - params: []prometheus.Parameter{ - prometheus.WithLogLevel(zerolog.Disabled), - prometheus.WithAddress("http://localhost:12345/"), - }, - err: "problem with parameters: no chain time service specified", - }, { name: "Good", params: []prometheus.Parameter{ prometheus.WithLogLevel(zerolog.Disabled), prometheus.WithAddress("http://localhost:12345/"), - prometheus.WithChainTime(chainTime), }, }, } diff --git a/services/metrics/prometheus/synccommitteeaggregation.go b/services/metrics/prometheus/synccommitteeaggregation.go index a0c1b562..30686afe 100644 --- a/services/metrics/prometheus/synccommitteeaggregation.go +++ b/services/metrics/prometheus/synccommitteeaggregation.go @@ -14,6 +14,7 @@ package prometheus import ( + "errors" "time" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -32,7 +33,12 @@ func (s *Service) setupSyncCommitteeAggregationMetrics() error { }, }) if err := prometheus.Register(s.syncCommitteeAggregationProcessTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeAggregationProcessTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.syncCommitteeAggregationMarkTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -56,7 +62,12 @@ func (s *Service) setupSyncCommitteeAggregationMetrics() error { }, }) if err := prometheus.Register(s.syncCommitteeAggregationMarkTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeAggregationMarkTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.syncCommitteeAggregationProcessLatestSlot = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -66,7 +77,12 @@ func (s *Service) setupSyncCommitteeAggregationMetrics() error { Help: "The latest slot for which Vouch created a sync committee aggregate.", }) if err := prometheus.Register(s.syncCommitteeAggregationProcessLatestSlot); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeAggregationProcessLatestSlot = alreadyRegisteredError.ExistingCollector.(prometheus.Gauge) + } else { + return err + } } s.syncCommitteeAggregationProcessRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -76,7 +92,12 @@ func (s *Service) setupSyncCommitteeAggregationMetrics() error { Help: "The number of sync committee aggregation processes.", }, []string{"result"}) if err := prometheus.Register(s.syncCommitteeAggregationProcessRequests); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeAggregationProcessRequests = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } } s.syncCommitteeAggregationCoverageRatio = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -86,7 +107,16 @@ func (s *Service) setupSyncCommitteeAggregationMetrics() error { Help: "The ratio of included to possible messages in the aggregate.", Buckets: []float64{0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, }) - return prometheus.Register(s.syncCommitteeAggregationCoverageRatio) + if err := prometheus.Register(s.syncCommitteeAggregationCoverageRatio); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeAggregationCoverageRatio = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } + } + + return nil } // SyncCommitteeAggregationsCompleted is called when a sync committee aggregation process has completed. @@ -97,8 +127,9 @@ func (s *Service) SyncCommitteeAggregationsCompleted(started time.Time, slot pha for i := 0; i < count; i++ { s.syncCommitteeAggregationProcessTimer.Observe(duration) } - secsSinceStartOfSlot := time.Since(s.chainTime.StartOfSlot(slot)).Seconds() - s.syncCommitteeAggregationMarkTimer.Observe(secsSinceStartOfSlot) + if s.chainTime != nil { + s.syncCommitteeAggregationMarkTimer.Observe(time.Since(s.chainTime.StartOfSlot(slot)).Seconds()) + } s.syncCommitteeAggregationProcessLatestSlot.Set(float64(slot)) } s.syncCommitteeAggregationProcessRequests.WithLabelValues(result).Add(float64(count)) diff --git a/services/metrics/prometheus/synccommitteemessenger.go b/services/metrics/prometheus/synccommitteemessenger.go index ff81bb0b..c59a32f5 100644 --- a/services/metrics/prometheus/synccommitteemessenger.go +++ b/services/metrics/prometheus/synccommitteemessenger.go @@ -14,6 +14,7 @@ package prometheus import ( + "errors" "time" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -32,7 +33,12 @@ func (s *Service) setupSyncCommitteeMessageMetrics() error { }, }) if err := prometheus.Register(s.syncCommitteeMessageProcessTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeMessageProcessTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.syncCommitteeMessageMarkTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -56,7 +62,12 @@ func (s *Service) setupSyncCommitteeMessageMetrics() error { }, }) if err := prometheus.Register(s.syncCommitteeMessageMarkTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeMessageMarkTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.syncCommitteeMessageProcessLatestSlot = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -66,7 +77,12 @@ func (s *Service) setupSyncCommitteeMessageMetrics() error { Help: "The latest slot for which Vouch created a sync committee message.", }) if err := prometheus.Register(s.syncCommitteeMessageProcessLatestSlot); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeMessageProcessLatestSlot = alreadyRegisteredError.ExistingCollector.(prometheus.Gauge) + } else { + return err + } } s.syncCommitteeMessageProcessRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -75,7 +91,16 @@ func (s *Service) setupSyncCommitteeMessageMetrics() error { Name: "requests_total", Help: "The number of sync committee message processes.", }, []string{"result"}) - return prometheus.Register(s.syncCommitteeMessageProcessRequests) + if err := prometheus.Register(s.syncCommitteeMessageProcessRequests); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeMessageProcessRequests = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } + } + + return nil } // SyncCommitteeMessagesCompleted is called when a sync committee message process has completed. @@ -86,8 +111,9 @@ func (s *Service) SyncCommitteeMessagesCompleted(started time.Time, slot phase0. for i := 0; i < count; i++ { s.syncCommitteeMessageProcessTimer.Observe(duration) } - secsSinceStartOfSlot := time.Since(s.chainTime.StartOfSlot(slot)).Seconds() - s.syncCommitteeMessageMarkTimer.Observe(secsSinceStartOfSlot) + if s.chainTime != nil { + s.syncCommitteeMessageMarkTimer.Observe(time.Since(s.chainTime.StartOfSlot(slot)).Seconds()) + } s.syncCommitteeMessageProcessLatestSlot.Set(float64(slot)) } s.syncCommitteeMessageProcessRequests.WithLabelValues(result).Add(float64(count)) diff --git a/services/metrics/prometheus/synccommitteesubscriber.go b/services/metrics/prometheus/synccommitteesubscriber.go index c16d7469..d4c80bd3 100644 --- a/services/metrics/prometheus/synccommitteesubscriber.go +++ b/services/metrics/prometheus/synccommitteesubscriber.go @@ -14,6 +14,7 @@ package prometheus import ( + "errors" "time" "github.com/prometheus/client_golang/prometheus" @@ -31,7 +32,12 @@ func (s *Service) setupSyncCommitteeSubscriptionMetrics() error { }, }) if err := prometheus.Register(s.syncCommitteeSubscriptionProcessTimer); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeSubscriptionProcessTimer = alreadyRegisteredError.ExistingCollector.(prometheus.Histogram) + } else { + return err + } } s.syncCommitteeSubscriptionProcessRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -41,7 +47,12 @@ func (s *Service) setupSyncCommitteeSubscriptionMetrics() error { Help: "The number of sync committee subscription processes.", }, []string{"result"}) if err := prometheus.Register(s.syncCommitteeSubscriptionProcessRequests); err != nil { - return err + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeSubscriptionProcessRequests = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } } s.syncCommitteeSubscribers = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -50,7 +61,16 @@ func (s *Service) setupSyncCommitteeSubscriptionMetrics() error { Name: "subscribers_total", Help: "The number of sync committee subscribed.", }) - return prometheus.Register(s.syncCommitteeSubscribers) + if err := prometheus.Register(s.syncCommitteeSubscribers); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeSubscribers = alreadyRegisteredError.ExistingCollector.(prometheus.Gauge) + } else { + return err + } + } + + return nil } // SyncCommitteeSubscriptionCompleted is called when an sync committee subscription process has completed. diff --git a/util/commit.go b/util/commit.go new file mode 100644 index 00000000..6f100fa1 --- /dev/null +++ b/util/commit.go @@ -0,0 +1,29 @@ +// Copyright © 2023 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "runtime/debug" + +// CommitHah returns the commit hash of the build, if available. +func CommitHash() string { + if info, ok := debug.ReadBuildInfo(); ok { + for _, setting := range info.Settings { + if setting.Key == "vcs.revision" { + return setting.Value + } + } + } + + return "" +}