Skip to content

Commit

Permalink
feat: add sp metrics for internal/external sp (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
randyahx authored Nov 1, 2023
1 parent 047fb96 commit 48ff5b3
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 21 deletions.
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
AlertConfig AlertConfig `json:"alert_config"`
DBConfig DBConfig `json:"db_config"`
MetricsConfig MetricsConfig `json:"metrics_config"`
SPConfig SPConfig `json:"sp_config"`
}

type GreenfieldConfig struct {
Expand Down Expand Up @@ -125,6 +126,10 @@ func (cfg *DBConfig) Validate() {
}
}

type SPConfig struct {
InternalSPEndpoints []string `json:"internal_sp_endpoints"`
}

type MetricsConfig struct {
Port uint16 `json:"port"`
}
Expand Down
11 changes: 11 additions & 0 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@
"max_open_conns": 40,
"debug_mode": true
},
"sp_config": {
"internal_sp_endpoints": [
"https://greenfield-sp.bnbchain.org",
"https://greenfield-sp.nodereal.io",
"https://greenfield-sp.ninicoin.io",
"https://greenfield-sp.defibit.io",
"https://greenfield-sp.nariox.org",
"https://greenfield-sp.lumibot.org",
"https://greenfield-sp.voltbot.io"
]
},
"alert_config": {
"interval": 300,
"identity": "your_identity",
Expand Down
3 changes: 1 addition & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (e *Executor) QueryChallengeSlashCoolingOffPeriod() (uint64, error) {
client := e.clients.GetClient()
params, err := client.ChallengeParams(context.Background(), &challengetypes.QueryParamsRequest{})
if err != nil {
logging.Logger.Errorf("query challenge params failed, err=%+v", err.Error())
logging.Logger.Errorf("query slash cooling off period failed, err=%+v", err.Error())
return 0, err
}
logging.Logger.Infof("challenge slash cooling off period: %d", params.Params.SlashCoolingOffPeriod)
Expand Down Expand Up @@ -345,7 +345,6 @@ func (e *Executor) GetStorageProviderEndpoint(address string) (string, error) {
logging.Logger.Errorf("executor failed to query storage provider %s, err=%+v", address, err.Error())
return "", err
}
logging.Logger.Infof("response res.endpoint %s", res.Endpoint)

return res.Endpoint, nil
}
Expand Down
39 changes: 27 additions & 12 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
MetricVerifiedChallengeSuccess = "challenge_success"
MetricHeartbeatEvents = "heartbeat_events"
MetricHashVerifierErr = "hash_verifier_error_count"
MetricSpAPIErr = "hash_verifier_sp_api_error"
MetricInternalSpAPIErr = "hash_verifier_internal_sp_api_error"
MetricExternalSpAPIErr = "hash_verifier_external_sp_api_error"
MetricHashVerifierDuration = "hash_verifier_duration"

// Vote Broadcaster
Expand Down Expand Up @@ -125,17 +126,24 @@ func NewMetricService(config *config.Config) *MetricService {

hashVerifierErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricHashVerifierErr,
Help: "Hash verifier error count",
Help: "Verifier error count",
})
ms[MetricHashVerifierErr] = hashVerifierErrCountMetric
prometheus.MustRegister(hashVerifierErrCountMetric)

hashVerifierSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricSpAPIErr,
Help: "Hash verifier SP API error count",
hashVerifierInternalSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricInternalSpAPIErr,
Help: "Internal sp error count",
})
ms[MetricSpAPIErr] = hashVerifierSpApiErrCountMetric
prometheus.MustRegister(hashVerifierSpApiErrCountMetric)
ms[MetricInternalSpAPIErr] = hashVerifierInternalSpApiErrCountMetric
prometheus.MustRegister(hashVerifierInternalSpApiErrCountMetric)

hashVerifierExternalSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricExternalSpAPIErr,
Help: "External sp error count",
})
ms[MetricExternalSpAPIErr] = hashVerifierExternalSpApiErrCountMetric
prometheus.MustRegister(hashVerifierExternalSpApiErrCountMetric)

// Broadcaster
broadcasterErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -281,15 +289,22 @@ func (m *MetricService) IncHeartbeatEvents() {
func (m *MetricService) IncHashVerifierErr(err error) {
if err != nil {
logging.Logger.Errorf("verifier error count increased, %s", err.Error())
m.MetricsMap[MetricHashVerifierErr].(prometheus.Counter).Inc()
}
}

func (m *MetricService) IncHashVerifierInternalSpApiErr(err error) {
if err != nil {
logging.Logger.Errorf("verifier internal sp error count increased, %s", err.Error())
m.MetricsMap[MetricInternalSpAPIErr].(prometheus.Counter).Inc()
}
m.MetricsMap[MetricHashVerifierErr].(prometheus.Counter).Inc()
}

func (m *MetricService) IncHashVerifierSpApiErr(err error) {
func (m *MetricService) IncHashVerifierExternalSpApiErr(err error) {
if err != nil {
logging.Logger.Errorf("verifier sp api error count increased, %s", err.Error())
logging.Logger.Errorf("verifier external sp error count increased, %s", err.Error())
m.MetricsMap[MetricExternalSpAPIErr].(prometheus.Counter).Inc()
}
m.MetricsMap[MetricSpAPIErr].(prometheus.Counter).Inc()
}

// Broadcaster
Expand Down Expand Up @@ -346,8 +361,8 @@ func (m *MetricService) SetSubmitterDuration(duration time.Duration) {
func (m *MetricService) IncSubmitterErr(err error) {
if err != nil {
logging.Logger.Errorf("submitter error count increased, %s", err.Error())
m.MetricsMap[MetricSubmitterErr].(prometheus.Counter).Inc()
}
m.MetricsMap[MetricSubmitterErr].(prometheus.Counter).Inc()
}

// Attest Monitor
Expand Down
2 changes: 2 additions & 0 deletions submitter/tx_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (s *TxSubmitter) submitTransactionLoop(event *model.Event, attestPeriodEnd
}
return err
}
s.metricService.IncSubmitterErr(err)
} else {
logging.Logger.Errorf("submitter failed for challengeId: %d, attempts: %d", event.ChallengeId, submittedAttempts)
}
Expand All @@ -217,6 +218,7 @@ func (s *TxSubmitter) submitTransactionLoop(event *model.Event, attestPeriodEnd
err = s.DataProvider.UpdateEventStatus(event.ChallengeId, model.Submitted)
if err != nil {
logging.Logger.Errorf("submitter succeeded in attesting but failed to update database, err=%+v", err.Error())
s.metricService.IncSubmitterErr(err)
continue
}

Expand Down
1 change: 1 addition & 0 deletions verifier/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ package verifier
import "time"

var VerifyHashLoopInterval = 2 * time.Second
var UpdateDeduplicationInterval = 24 * time.Hour
50 changes: 43 additions & 7 deletions verifier/hash_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvid
}

func (v *Verifier) VerifyHashLoop() {
go v.updateDeduplicationIntervalLoop()
for {
err := v.verifyHash()
if err != nil {
Expand All @@ -68,6 +69,7 @@ func (v *Verifier) VerifyHashLoop() {
time.Sleep(VerifyHashLoopInterval)
}
}

func (v *Verifier) verifyHash() error {
// Read unprocessed event from db with lowest challengeId
currentHeight, err := v.executor.GetCachedBlockHeight()
Expand Down Expand Up @@ -194,7 +196,7 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error {
return err
}
chainRootHash := checksums[event.RedundancyIndex+1]
logging.Logger.Infof("chainRootHash: %s for challengeId: %d", hex.EncodeToString(chainRootHash), event.ChallengeId)
logging.Logger.Infof("fetched chainRootHash: %s for challengeId: %d", hex.EncodeToString(chainRootHash), event.ChallengeId)

// Call sp for challenge result
challengeRes := &types.ChallengeResult{}
Expand All @@ -207,14 +209,19 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error {
return challengeResErr
}, retry.Context(context.Background()), common.RtyAttem, common.RtyDelay, common.RtyErr)
if challengeResErr != nil {
v.metricService.IncHashVerifierSpApiErr(err)
if v.isInternalSP(endpoint) {
v.metricService.IncHashVerifierInternalSpApiErr(challengeResErr)
logging.Logger.Infof("challenge succeeded due to internal sp api error for challengeId: %d, failed to fetch challenge result from sp endpoint: %s, objectID: %d, segmentIndex: %d, redundancyIndex: %d, err=%s", event.ChallengeId, endpoint, event.ObjectId, int(event.SegmentIndex), int(event.RedundancyIndex), challengeResErr)
} else {
v.metricService.IncHashVerifierExternalSpApiErr(challengeResErr)
logging.Logger.Infof("challenge succeeded due to external sp api error for challengeId: %d, failed to fetch challenge result from sp endpoint: %s, objectID: %d, segmentIndex: %d, redundancyIndex: %d, err=%s", event.ChallengeId, endpoint, event.ObjectId, int(event.SegmentIndex), int(event.RedundancyIndex), challengeResErr)
}
err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.HashMismatched)
if err != nil {
v.metricService.IncHashVerifierErr(err)
logging.Logger.Errorf("error updating event status for challengeId: %d", event.ChallengeId)
}
v.metricService.IncVerifiedChallenges()
v.metricService.IncChallengeSuccess()
return err
}

Expand All @@ -233,9 +240,8 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error {
spChecksums = append(spChecksums, checksum)
}
originalSpRootHash := hash.GenerateChecksum(bytes.Join(spChecksums, []byte("")))
logging.Logger.Infof("SpRootHash before replacing: %s for challengeId: %d", hex.EncodeToString(originalSpRootHash), event.ChallengeId)
spRootHash := v.computeRootHash(event.SegmentIndex, pieceData, spChecksums)
logging.Logger.Infof("SpRootHash after replacing: %s for challengeId: %d", hex.EncodeToString(spRootHash), event.ChallengeId)
logging.Logger.Infof("hash verification for challengeId: %d, Fetched Original SpRootHash: %s, Locally Computed SpRootHash: %s, Fetched ChainRootHash: %s", event.ChallengeId, hex.EncodeToString(originalSpRootHash), hex.EncodeToString(spRootHash), hex.EncodeToString(chainRootHash))
// Update database after comparing
err = v.compareHashAndUpdate(event.ChallengeId, chainRootHash, spRootHash)
if err != nil {
Expand Down Expand Up @@ -267,9 +273,14 @@ func (v *Verifier) preCheck(event *model.Event, currentHeight uint64) error {
if heartbeatInterval == 0 {
panic("heartbeat interval should not zero, potential bug")
}
if event.ChallengerAddress == "" && event.ChallengeId%heartbeatInterval != 0 && event.ChallengeId > v.deduplicationInterval {

v.mtx.Lock()
deduplicationInterval := v.deduplicationInterval
v.mtx.Unlock()

if event.ChallengerAddress == "" && event.ChallengeId%heartbeatInterval != 0 && event.ChallengeId > deduplicationInterval {
found, err := v.dataProvider.IsEventExistsBetween(event.ObjectId, event.SpOperatorAddress,
event.ChallengeId-v.deduplicationInterval, event.ChallengeId-1)
event.ChallengeId-deduplicationInterval, event.ChallengeId-1)
if err != nil {
logging.Logger.Errorf("verifier failed to retrieve information for event %d, err=%+v", event.ChallengeId, err.Error())
return err
Expand Down Expand Up @@ -298,6 +309,7 @@ func (v *Verifier) compareHashAndUpdate(challengeId uint64, chainRootHash []byte
return err
}
// update metrics if no err
logging.Logger.Infof("challenge failed for challengeId: %d, hash matched", challengeId)
v.metricService.IncVerifiedChallenges()
v.metricService.IncChallengeFailed()
return err
Expand All @@ -307,7 +319,31 @@ func (v *Verifier) compareHashAndUpdate(challengeId uint64, chainRootHash []byte
return err
}
// update metrics if no err
logging.Logger.Infof("challenge succeeded for challengeId: %d, hash mismatched", challengeId)
v.metricService.IncVerifiedChallenges()
v.metricService.IncChallengeSuccess()
return err
}

func (v *Verifier) isInternalSP(spEndpoint string) bool {
for _, internalEndpoint := range v.config.SPConfig.InternalSPEndpoints {
if strings.Contains(spEndpoint, internalEndpoint) {
return true
}
}
return false
}

func (v *Verifier) updateDeduplicationIntervalLoop() {
ticker := time.NewTicker(UpdateDeduplicationInterval)
for range ticker.C {
updatedDeduplicationInterval, err := v.executor.QueryChallengeSlashCoolingOffPeriod()
if err != nil {
logging.Logger.Errorf("error updating deduplication interval, err=%s", err.Error())
return
}
v.mtx.Lock()
v.deduplicationInterval = updatedDeduplicationInterval
v.mtx.Unlock()
}
}

0 comments on commit 48ff5b3

Please sign in to comment.