Skip to content

Commit

Permalink
Account the invalid requests in disperser metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix committed Mar 29, 2024
1 parent 67b1872 commit a83bae0
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
25 changes: 18 additions & 7 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,20 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
// Process disperse_request
in, err := stream.Recv()
if err != nil {
s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated")
return api.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err))
}

request, ok := in.GetPayload().(*pb.AuthenticatedRequest_DisperseRequest)
if !ok {
s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated")
return api.NewInvalidArgError("missing DisperseBlobRequest")
}

blob, err := s.validateRequestAndGetBlob(ctx, request.DisperseRequest)
if err != nil {
for _, quorumID := range request.DisperseRequest.CustomQuorumNumbers {
s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(request.DisperseRequest.GetData()), "DisperseBlob")
s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(request.DisperseRequest.GetData()), "DisperseBlobAuthenticated")
}
return api.NewInvalidArgError(err.Error())
}
Expand All @@ -121,11 +123,13 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
// Decode public key
publicKeyBytes, err := hexutil.Decode(blob.RequestHeader.AccountID)
if err != nil {
s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated")
return api.NewInvalidArgError(fmt.Sprintf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err))
}

pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes)
if err != nil {
s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated")
return api.NewInvalidArgError(fmt.Sprintf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err))
}

Expand Down Expand Up @@ -160,13 +164,16 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
select {
case in = <-resultCh:
case err := <-errCh:
s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated")
return api.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err))
case <-ctx.Done():
s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated")
return api.NewInvalidArgError("context deadline exceeded")
}

challengeReply, ok := in.GetPayload().(*pb.AuthenticatedRequest_AuthenticationData)
if !ok {
s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated")
return api.NewInvalidArgError("expected AuthenticationData")
}

Expand All @@ -175,11 +182,12 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse

err = s.authenticator.AuthenticateBlobRequest(blob.RequestHeader.BlobAuthHeader)
if err != nil {
s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated")
return api.NewInvalidArgError(fmt.Sprintf("failed to authenticate blob request: %v", err))
}

// Disperse the blob
reply, err := s.disperseBlob(ctx, blob, authenticatedAddress)
reply, err := s.disperseBlob(ctx, blob, authenticatedAddress, "DisperseBlobAuthenticated")
if err != nil {
s.logger.Info("failed to disperse blob", "err", err)
return err
Expand Down Expand Up @@ -207,14 +215,14 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob
return nil, api.NewInvalidArgError(err.Error())
}

reply, err := s.disperseBlob(ctx, blob, "")
reply, err := s.disperseBlob(ctx, blob, "", "DisperseBlob")
if err != nil {
s.logger.Info("failed to disperse blob", "err", err)
}
return reply, err
}

func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, authenticatedAddress string) (*pb.DisperseBlobReply, error) {
func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, authenticatedAddress string, apiMethodName string) (*pb.DisperseBlobReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("DisperseBlob", f*1000) // make milliseconds
}))
Expand All @@ -232,7 +240,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), quorumId, blobSize, "DisperseBlob")
s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), quorumId, blobSize, apiMethodName)
}
return nil, api.NewInvalidArgError(err.Error())
}
Expand All @@ -251,15 +259,15 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleBlobStoreFailedRequest(quorumId, blobSize, "DisperseBlob")
s.metrics.HandleBlobStoreFailedRequest(quorumId, blobSize, apiMethodName)
}
s.logger.Error("failed to store blob", "err", err)
return nil, api.NewInternalError("failed to store blob, please try again later")
}

for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleSuccessfulRequest(quorumId, blobSize, "DisperseBlob")
s.metrics.HandleSuccessfulRequest(quorumId, blobSize, apiMethodName)
}

s.logger.Info("successfully received a new blob: ", "key", metadataKey.String())
Expand Down Expand Up @@ -498,12 +506,14 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR

requestID := req.GetRequestId()
if len(requestID) == 0 {
s.metrics.HandleInvalidArgRequest("GetBlobStatus")
return nil, api.NewInvalidArgError("request_id must not be empty")
}

s.logger.Info("received a new blob status request", "requestID", string(requestID))
metadataKey, err := disperser.ParseBlobKey(string(requestID))
if err != nil {
s.metrics.HandleInvalidArgRequest("GetBlobStatus")
return nil, api.NewInvalidArgError(fmt.Sprintf("failed to parse the requestID: %s", err.Error()))
}

Expand Down Expand Up @@ -600,6 +610,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob

origin, err := common.GetClientAddress(ctx, s.rateConfig.ClientIPHeader, 2, true)
if err != nil {
s.metrics.HandleInvalidArgRequest("RetrieveBlob")
return nil, api.NewInvalidArgError(err.Error())
}

Expand Down
10 changes: 10 additions & 0 deletions disperser/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ func (g *Metrics) HandleBlobStoreFailedRequest(quorum string, blobBytes int, met
}).Add(float64(blobBytes))
}

// HandleInvalidArgRequest updates the number of invalid argument requests
func (g *Metrics) HandleInvalidArgRequest(method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status_code": codes.InvalidArgument.String(),
"status": "failed",
"quorum": "",
"method": method,
}).Inc()
}

// HandleSystemRateLimitedRequest updates the number of system rate limited requests and the size of the blob
func (g *Metrics) HandleSystemRateLimitedRequest(quorum string, blobBytes int, method string) {
g.NumBlobRequests.With(prometheus.Labels{
Expand Down

0 comments on commit a83bae0

Please sign in to comment.