diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 158e2f034a..b16e6fdfbe 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -101,18 +101,20 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse // Process disperse_request in, err := stream.Recv() if err != nil { + s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") return api.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err)) } request, ok := in.GetPayload().(*pb.AuthenticatedRequest_DisperseRequest) if !ok { + s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") return api.NewInvalidArgError("missing DisperseBlobRequest") } blob, err := s.validateRequestAndGetBlob(ctx, request.DisperseRequest) if err != nil { for _, quorumID := range request.DisperseRequest.CustomQuorumNumbers { - s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(request.DisperseRequest.GetData()), "DisperseBlob") + s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(request.DisperseRequest.GetData()), "DisperseBlobAuthenticated") } return api.NewInvalidArgError(err.Error()) } @@ -121,11 +123,13 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse // Decode public key publicKeyBytes, err := hexutil.Decode(blob.RequestHeader.AccountID) if err != nil { + s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") return api.NewInvalidArgError(fmt.Sprintf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err)) } pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes) if err != nil { + s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") return api.NewInvalidArgError(fmt.Sprintf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err)) } @@ -160,13 +164,16 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse select { case in = <-resultCh: case err := <-errCh: + s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") return api.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err)) case <-ctx.Done(): + s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") return api.NewInvalidArgError("context deadline exceeded") } challengeReply, ok := in.GetPayload().(*pb.AuthenticatedRequest_AuthenticationData) if !ok { + s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") return api.NewInvalidArgError("expected AuthenticationData") } @@ -175,11 +182,12 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse err = s.authenticator.AuthenticateBlobRequest(blob.RequestHeader.BlobAuthHeader) if err != nil { + s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") return api.NewInvalidArgError(fmt.Sprintf("failed to authenticate blob request: %v", err)) } // Disperse the blob - reply, err := s.disperseBlob(ctx, blob, authenticatedAddress) + reply, err := s.disperseBlob(ctx, blob, authenticatedAddress, "DisperseBlobAuthenticated") if err != nil { s.logger.Info("failed to disperse blob", "err", err) return err @@ -207,14 +215,14 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob return nil, api.NewInvalidArgError(err.Error()) } - reply, err := s.disperseBlob(ctx, blob, "") + reply, err := s.disperseBlob(ctx, blob, "", "DisperseBlob") if err != nil { s.logger.Info("failed to disperse blob", "err", err) } return reply, err } -func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, authenticatedAddress string) (*pb.DisperseBlobReply, error) { +func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, authenticatedAddress string, apiMethodName string) (*pb.DisperseBlobReply, error) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("DisperseBlob", f*1000) // make milliseconds })) @@ -232,7 +240,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut if err != nil { for _, param := range securityParams { quorumId := string(param.QuorumID) - s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), quorumId, blobSize, "DisperseBlob") + s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), quorumId, blobSize, apiMethodName) } return nil, api.NewInvalidArgError(err.Error()) } @@ -251,7 +259,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut if err != nil { for _, param := range securityParams { quorumId := string(param.QuorumID) - s.metrics.HandleBlobStoreFailedRequest(quorumId, blobSize, "DisperseBlob") + s.metrics.HandleBlobStoreFailedRequest(quorumId, blobSize, apiMethodName) } s.logger.Error("failed to store blob", "err", err) return nil, api.NewInternalError("failed to store blob, please try again later") @@ -259,7 +267,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut for _, param := range securityParams { quorumId := string(param.QuorumID) - s.metrics.HandleSuccessfulRequest(quorumId, blobSize, "DisperseBlob") + s.metrics.HandleSuccessfulRequest(quorumId, blobSize, apiMethodName) } s.logger.Info("successfully received a new blob: ", "key", metadataKey.String()) @@ -498,12 +506,14 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR requestID := req.GetRequestId() if len(requestID) == 0 { + s.metrics.HandleInvalidArgRequest("GetBlobStatus") return nil, api.NewInvalidArgError("request_id must not be empty") } s.logger.Info("received a new blob status request", "requestID", string(requestID)) metadataKey, err := disperser.ParseBlobKey(string(requestID)) if err != nil { + s.metrics.HandleInvalidArgRequest("GetBlobStatus") return nil, api.NewInvalidArgError(fmt.Sprintf("failed to parse the requestID: %s", err.Error())) } @@ -600,6 +610,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob origin, err := common.GetClientAddress(ctx, s.rateConfig.ClientIPHeader, 2, true) if err != nil { + s.metrics.HandleInvalidArgRequest("RetrieveBlob") return nil, api.NewInvalidArgError(err.Error()) } diff --git a/disperser/metrics.go b/disperser/metrics.go index 19902b77a6..40f99851a2 100644 --- a/disperser/metrics.go +++ b/disperser/metrics.go @@ -135,6 +135,16 @@ func (g *Metrics) HandleBlobStoreFailedRequest(quorum string, blobBytes int, met }).Add(float64(blobBytes)) } +// HandleInvalidArgRequest updates the number of invalid argument requests +func (g *Metrics) HandleInvalidArgRequest(method string) { + g.NumBlobRequests.With(prometheus.Labels{ + "status_code": codes.InvalidArgument.String(), + "status": "failed", + "quorum": "", + "method": method, + }).Inc() +} + // HandleSystemRateLimitedRequest updates the number of system rate limited requests and the size of the blob func (g *Metrics) HandleSystemRateLimitedRequest(quorum string, blobBytes int, method string) { g.NumBlobRequests.With(prometheus.Labels{