diff --git a/core/job/handler/v1beta1/job.go b/core/job/handler/v1beta1/job.go index 7dc5fe28b0..8434ecae97 100644 --- a/core/job/handler/v1beta1/job.go +++ b/core/job/handler/v1beta1/job.go @@ -8,6 +8,8 @@ import ( "time" "github.com/goto/salt/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/goto/optimus/core/job" @@ -15,18 +17,12 @@ import ( "github.com/goto/optimus/core/tenant" "github.com/goto/optimus/internal/errors" "github.com/goto/optimus/internal/models" - "github.com/goto/optimus/internal/telemetry" "github.com/goto/optimus/internal/utils/filter" "github.com/goto/optimus/internal/writer" pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1" "github.com/goto/optimus/sdk/plugin" ) -const ( - metricReplaceAllDuration = "job_replace_all_duration_seconds" - metricRefreshDuration = "job_refresh_duration_seconds" -) - type JobHandler struct { l log.Logger jobService JobService @@ -43,6 +39,14 @@ func NewJobHandler(jobService JobService, changeLogService ChangeLogService, log } } +var jobReplaceAllDurationMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "job_replace_all_duration_seconds", +}, []string{"project", "namespace"}) + +var jobRefreshDurationMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "job_refresh_duration_seconds", +}, []string{"project"}) + type JobModificationService interface { Add(ctx context.Context, jobTenant tenant.Tenant, jobs []*job.Spec) ([]job.Name, error) Update(ctx context.Context, jobTenant tenant.Tenant, jobs []*job.Spec) ([]job.Name, error) @@ -222,12 +226,6 @@ func (jh *JobHandler) ChangeJobNamespace(ctx context.Context, changeRequest *pb. return nil, errors.GRPCErr(err, errorMsg) } - telemetry.NewCounter("job_namespace_migrations_total", map[string]string{ - "project": jobSourceTenant.ProjectName().String(), - "namespace_source": jobSourceTenant.NamespaceName().String(), - "namespace_destination": jobNewTenant.NamespaceName().String(), - }).Inc() - return &pb.ChangeJobNamespaceResponse{}, nil } @@ -524,10 +522,11 @@ func (jh *JobHandler) ReplaceAllJobSpecifications(stream pb.JobSpecificationServ processDuration := time.Since(startTime) jh.l.Debug("finished replacing all job specifications for project [%s] namespace [%s], took %s", request.GetProjectName(), request.GetNamespaceName(), processDuration) - telemetry.NewGauge(metricReplaceAllDuration, map[string]string{ - "project": jobTenant.ProjectName().String(), - "namespace": jobTenant.NamespaceName().String(), - }).Add(processDuration.Seconds()) + + jobReplaceAllDurationMetric.WithLabelValues( + jobTenant.ProjectName().String(), + jobTenant.NamespaceName().String(), + ).Add(processDuration.Seconds()) } if len(errNamespaces) > 0 { errMessageSummary := strings.Join(errMessages, "\n") @@ -543,9 +542,7 @@ func (jh *JobHandler) RefreshJobs(request *pb.RefreshJobsRequest, stream pb.JobS startTime := time.Now() defer func() { processDuration := time.Since(startTime) - telemetry.NewGauge(metricRefreshDuration, map[string]string{ - "project": request.ProjectName, - }).Add(processDuration.Seconds()) + jobRefreshDurationMetric.WithLabelValues(request.ProjectName).Add(processDuration.Seconds()) jh.l.Debug("finished refreshing jobs for project [%s], took %s", request.GetProjectName(), processDuration) }() @@ -790,11 +787,11 @@ func (jh *JobHandler) JobInspect(ctx context.Context, req *pb.JobInspectRequest) } func raiseJobEventMetric(jobTenant tenant.Tenant, state string, metricValue int) { - telemetry.NewCounter(job.MetricJobEvent, map[string]string{ - "project": jobTenant.ProjectName().String(), - "namespace": jobTenant.NamespaceName().String(), - "status": state, - }).Add(float64(metricValue)) + job.EventMetric.WithLabelValues( + jobTenant.ProjectName().String(), + jobTenant.NamespaceName().String(), + state, + ).Add(float64(metricValue)) } func toValidateResultProto(result map[job.Name][]dto.ValidateResult) map[string]*pb.ValidateResponse_ResultList { diff --git a/core/job/job.go b/core/job/job.go index 095ba63ed0..34ba9e446a 100644 --- a/core/job/job.go +++ b/core/job/job.go @@ -4,6 +4,9 @@ import ( "fmt" "strings" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/goto/optimus/core/resource" "github.com/goto/optimus/core/tenant" "github.com/goto/optimus/internal/errors" @@ -19,7 +22,6 @@ const ( UpstreamStateResolved UpstreamState = "resolved" UpstreamStateUnresolved UpstreamState = "unresolved" - MetricJobEvent = "job_events_total" MetricJobEventStateAdded = "added" MetricJobEventStateUpdated = "updated" MetricJobEventStateDeleted = "deleted" @@ -30,10 +32,6 @@ const ( MetricJobEventDisabled = "disabled" MetricJobEventFoundDirty = "found_dirty" - MetricJobValidation = "job_validation" - - MetricJobRefreshResourceDownstream = "refresh_resource_downstream_total" - UnspecifiedImpactChange UpdateImpact = "unspecified_impact" JobInternalImpact UpdateImpact = "internal_impact" JobBehaviourImpact UpdateImpact = "behaviour_impact" @@ -43,6 +41,18 @@ const ( DeployStateFailed DeployState = "failed" ) +var EventMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "job_events_total", +}, []string{"project", "namespace", "status"}) + +var ValidationMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "job_validation", +}, []string{"project", "namespace", "stage", "success"}) + +var RefreshResourceDownstreamMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "refresh_resource_downstream_total", +}, []string{"project", "status"}) + type Job struct { tenant tenant.Tenant diff --git a/core/job/service/job_service.go b/core/job/service/job_service.go index 211f11239b..0414ec0707 100644 --- a/core/job/service/job_service.go +++ b/core/job/service/job_service.go @@ -21,7 +21,6 @@ import ( "github.com/goto/optimus/internal/errors" "github.com/goto/optimus/internal/lib/tree" "github.com/goto/optimus/internal/lib/window" - "github.com/goto/optimus/internal/telemetry" "github.com/goto/optimus/internal/utils/filter" "github.com/goto/optimus/internal/writer" "github.com/goto/optimus/sdk/plugin" @@ -812,11 +811,10 @@ func (j *JobService) RefreshResourceDownstream(ctx context.Context, resourceURNs status = "failed" } - counter := telemetry.NewCounter(job.MetricJobRefreshResourceDownstream, map[string]string{ - "project": projectName.String(), - "status": status, - }) - counter.Add(float64(len(jobNames))) + job.RefreshResourceDownstreamMetric.WithLabelValues( + projectName.String(), + status, + ).Add(float64(len(jobNames))) } return me.ToErr() @@ -1338,11 +1336,11 @@ func (*JobService) groupDownstreamPerProject(downstreams []*job.Downstream) map[ } func raiseJobEventMetric(jobTenant tenant.Tenant, state string, metricValue int) { - telemetry.NewCounter(job.MetricJobEvent, map[string]string{ - "project": jobTenant.ProjectName().String(), - "namespace": jobTenant.NamespaceName().String(), - "status": state, - }).Add(float64(metricValue)) + job.EventMetric.WithLabelValues( + jobTenant.ProjectName().String(), + jobTenant.NamespaceName().String(), + state, + ).Add(float64(metricValue)) } func (j *JobService) identifyUpstreamURNs(ctx context.Context, tenantWithDetails *tenant.WithDetails, spec *job.Spec) ([]resource.URN, error) { @@ -1941,14 +1939,12 @@ func (*JobService) validateWindow(tenantDetails *tenant.WithDetails, windowConfi } func registerJobValidationMetric(tnnt tenant.Tenant, stage dto.ValidateStage, success bool) { - counter := telemetry.NewCounter(job.MetricJobValidation, map[string]string{ - "project": tnnt.ProjectName().String(), - "namespace": tnnt.NamespaceName().String(), - "stage": stage.String(), - "success": fmt.Sprintf("%t", success), - }) - - counter.Add(1) + job.ValidationMetric.WithLabelValues( + tnnt.ProjectName().String(), + tnnt.NamespaceName().String(), + stage.String(), + fmt.Sprintf("%t", success), + ).Add(1) } func (j *JobService) GetDownstreamByResourceURN(ctx context.Context, tnnt tenant.Tenant, urn resource.URN) (job.DownstreamList, error) { diff --git a/core/resource/handler/v1beta1/resource.go b/core/resource/handler/v1beta1/resource.go index c90b1c2579..72d17e8bbc 100644 --- a/core/resource/handler/v1beta1/resource.go +++ b/core/resource/handler/v1beta1/resource.go @@ -8,20 +8,24 @@ import ( "time" "github.com/goto/salt/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/protobuf/types/known/structpb" "github.com/goto/optimus/core/resource" "github.com/goto/optimus/core/tenant" "github.com/goto/optimus/internal/errors" - "github.com/goto/optimus/internal/telemetry" "github.com/goto/optimus/internal/writer" pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1" ) -const ( - metricResourceEvents = "resource_events_total" - metricResourcesUploadAllDuration = "resource_upload_all_duration_seconds_total" -) +var resourceEventsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "resource_events_total", +}, []string{"project", "namespace", "datastore", "type", "status"}) + +var resourcesUploadAllDurationMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "resource_upload_all_duration_seconds_total", +}, []string{"operator_name", "event_type"}) type ResourceService interface { Create(ctx context.Context, res *resource.Resource) error @@ -129,10 +133,11 @@ func (rh ResourceHandler) DeployResourceSpecification(stream pb.ResourceService_ } processDuration := time.Since(startTime) - telemetry.NewGauge(metricResourcesUploadAllDuration, map[string]string{ - "project": tnnt.ProjectName().String(), - "namespace": tnnt.NamespaceName().String(), - }).Add(processDuration.Seconds()) + + resourcesUploadAllDurationMetric.WithLabelValues( + tnnt.ProjectName().String(), + tnnt.NamespaceName().String(), + ).Add(processDuration.Seconds()) } if len(errNamespaces) > 0 { @@ -383,12 +388,6 @@ func (rh ResourceHandler) ChangeResourceNamespace(ctx context.Context, req *pb.C return nil, errors.GRPCErr(err, "failed to update resource "+req.GetResourceName()) } - telemetry.NewCounter("resource_namespace_migrations_total", map[string]string{ - "project": tnnt.ProjectName().String(), - "namespace_source": tnnt.NamespaceName().String(), - "namespace_destination": newTnnt.NamespaceName().String(), - }).Inc() - return &pb.ChangeResourceNamespaceResponse{}, nil } @@ -542,13 +541,13 @@ func toResourceProto(res *resource.Resource) (*pb.ResourceSpecification, error) } func raiseResourceDatastoreEventMetric(jobTenant tenant.Tenant, datastoreName, resourceKind, state string) { - telemetry.NewCounter(metricResourceEvents, map[string]string{ - "project": jobTenant.ProjectName().String(), - "namespace": jobTenant.NamespaceName().String(), - "datastore": datastoreName, - "type": resourceKind, - "status": state, - }).Inc() + resourceEventsTotal.WithLabelValues( + jobTenant.ProjectName().String(), + jobTenant.NamespaceName().String(), + datastoreName, + resourceKind, + state, + ).Inc() } func toChangelogProto(cl *resource.ChangeLog) *pb.ResourceChangelog { diff --git a/core/resource/service/backup_service.go b/core/resource/service/backup_service.go index 7313666655..fbb2f6c135 100644 --- a/core/resource/service/backup_service.go +++ b/core/resource/service/backup_service.go @@ -6,22 +6,26 @@ import ( "time" "github.com/goto/salt/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/goto/optimus/core/resource" "github.com/goto/optimus/core/tenant" "github.com/goto/optimus/internal/errors" - "github.com/goto/optimus/internal/telemetry" ) const ( // recentBackupWindowMonths contains the window interval to consider for recent backups recentBackupWindowMonths = -3 - metricBackupRequest = "resource_backup_requests_total" backupRequestStatusSuccess = "success" backupRequestStatusFailed = "failed" ) +var backupRequestMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "resource_backup_requests_total", +}, []string{"project", "namespace", "resource", "status"}) + type BackupRepository interface { GetByID(ctx context.Context, id resource.BackupID) (*resource.Backup, error) GetAll(ctx context.Context, tnnt tenant.Tenant, store resource.Store) ([]*resource.Backup, error) @@ -142,10 +146,10 @@ func raiseBackupRequestMetrics(jobTenant tenant.Tenant, backupResult *resource.B } func raiseBackupRequestMetric(jobTenant tenant.Tenant, resourceName, state string) { - telemetry.NewCounter(metricBackupRequest, map[string]string{ - "project": jobTenant.ProjectName().String(), - "namespace": jobTenant.NamespaceName().String(), - "resource": resourceName, - "status": state, - }).Inc() + backupRequestMetric.WithLabelValues( + jobTenant.ProjectName().String(), + jobTenant.NamespaceName().String(), + resourceName, + state, + ).Inc() } diff --git a/core/scheduler/service/events_service.go b/core/scheduler/service/events_service.go index 53d28fa607..04839206a4 100644 --- a/core/scheduler/service/events_service.go +++ b/core/scheduler/service/events_service.go @@ -7,12 +7,13 @@ import ( "strings" "github.com/goto/salt/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/goto/optimus/core/scheduler" "github.com/goto/optimus/core/tenant" "github.com/goto/optimus/internal/compiler" "github.com/goto/optimus/internal/errors" - "github.com/goto/optimus/internal/telemetry" ) const ( @@ -20,6 +21,10 @@ const ( NotificationSchemePagerDuty = "pagerduty" ) +var jobrunAlertsMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jobrun_alerts_total", +}, []string{"project", "namespace", "type"}) + type Notifier interface { io.Closer Notify(ctx context.Context, attr scheduler.NotifyAttrs) error @@ -179,11 +184,11 @@ func (e *EventsService) Push(ctx context.Context, event *scheduler.Event) error } } } - telemetry.NewCounter("jobrun_alerts_total", map[string]string{ - "project": event.Tenant.ProjectName().String(), - "namespace": event.Tenant.NamespaceName().String(), - "type": event.Type.String(), - }).Inc() + jobrunAlertsMetric.WithLabelValues( + event.Tenant.ProjectName().String(), + event.Tenant.NamespaceName().String(), + event.Type.String(), + ).Inc() } } return multierror.ToErr() diff --git a/core/scheduler/service/job_run_service.go b/core/scheduler/service/job_run_service.go index f75fc93d5e..c7149cce01 100644 --- a/core/scheduler/service/job_run_service.go +++ b/core/scheduler/service/job_run_service.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "fmt" - "strings" "time" "github.com/google/uuid" "github.com/goto/salt/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/goto/optimus/core/event" "github.com/goto/optimus/core/event/moderator" @@ -21,7 +22,6 @@ import ( "github.com/goto/optimus/internal/lib/interval" "github.com/goto/optimus/internal/lib/window" "github.com/goto/optimus/internal/models" - "github.com/goto/optimus/internal/telemetry" ) type metricType string @@ -32,10 +32,18 @@ func (m metricType) String() string { const ( scheduleDelay metricType = "schedule_delay" - - metricJobRunEvents = "jobrun_events_total" ) +var jobRunEventsMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "operator_stats", + Help: "total job run events received", +}, []string{"operator_name", "event_type"}) + +var jobRunDdurationsBreakdownSeconds = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "jobrun_durations_breakdown_seconds", + Help: "operator wise time spent", +}, []string{"project", "namespace", "job", "type"}) + type JobRepository interface { GetJob(ctx context.Context, name tenant.ProjectName, jobName scheduler.JobName) (*scheduler.Job, error) GetJobDetails(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName) (*scheduler.JobWithDetails, error) @@ -343,12 +351,13 @@ func (s *JobRunService) registerNewJobRun(ctx context.Context, tenant tenant.Ten return err } - telemetry.NewGauge("jobrun_durations_breakdown_seconds", map[string]string{ - "project": tenant.ProjectName().String(), - "namespace": tenant.NamespaceName().String(), - "job": jobName.String(), - "type": scheduleDelay.String(), - }).Set(float64(time.Now().Unix() - scheduledAt.Unix())) + jobRunDdurationsBreakdownSeconds.WithLabelValues( + tenant.ProjectName().String(), + tenant.NamespaceName().String(), + jobName.String(), + scheduleDelay.String(), + ).Set(float64(time.Now().Unix() - scheduledAt.Unix())) + return nil } @@ -454,21 +463,11 @@ func (s *JobRunService) updateJobRunSLA(ctx context.Context, event *scheduler.Ev } var slaBreachedJobRunScheduleTimes []time.Time event.SLAObjectList, slaBreachedJobRunScheduleTimes = s.filterSLAObjects(ctx, event) - err := s.repo.UpdateSLA(ctx, event.JobName, event.Tenant.ProjectName(), slaBreachedJobRunScheduleTimes) if err != nil { s.l.Error("error updating job run sla status", err) return err } - err = telemetry.SetGaugeViaPush(metricJobRunEvents, map[string]string{ - "project": event.Tenant.ProjectName().String(), - "namespace": event.Tenant.NamespaceName().String(), - "name": event.JobName.String(), - "status": scheduler.SLAMissEvent.String(), - }, 1) - if err != nil { - s.l.Error("failed metric push", err) - } return nil } @@ -506,17 +505,6 @@ func (s *JobRunService) raiseJobRunStateChangeEvent(jobRun *scheduler.JobRun) { return } s.eventHandler.HandleEvent(schedulerEvent) - err = telemetry.SetGaugeViaPush(metricJobRunEvents, map[string]string{ - "project": jobRun.Tenant.ProjectName().String(), - "namespace": jobRun.Tenant.NamespaceName().String(), - "name": jobRun.JobName.String(), - "status": jobRun.State.String(), - }, 1) - if err != nil { - if !strings.Contains(err.Error(), "status code 204") { - s.l.Error("failed metric push", err) - } - } } func (s *JobRunService) createOperatorRun(ctx context.Context, event *scheduler.Event, operatorType scheduler.OperatorType) error { @@ -584,12 +572,13 @@ func (s *JobRunService) updateOperatorRun(ctx context.Context, event *scheduler. s.l.Error("error updating operator run id [%s]: %s", operatorRun.ID, err) return err } - telemetry.NewGauge("jobrun_durations_breakdown_seconds", map[string]string{ - "project": event.Tenant.ProjectName().String(), - "namespace": event.Tenant.NamespaceName().String(), - "job": event.JobName.String(), - "type": operatorType.String(), - }).Set(float64(event.EventTime.Unix() - operatorRun.StartTime.Unix())) + jobRunDdurationsBreakdownSeconds.WithLabelValues( + event.Tenant.ProjectName().String(), + event.Tenant.NamespaceName().String(), + event.JobName.String(), + operatorType.String(), + ).Set(float64(event.EventTime.Unix() - operatorRun.StartTime.Unix())) + return nil } @@ -605,33 +594,17 @@ func (s *JobRunService) trackEvent(event *scheduler.Event) { event.Type, event.EventTime.Format("01/02/06 15:04:05 MST"), event.JobName, event.OperatorName, event.JobScheduledAt.Format("01/02/06 15:04:05 MST"), event.Status) } - if event.Type == scheduler.SensorStartEvent || event.Type == scheduler.SensorRetryEvent || event.Type == scheduler.SensorSuccessEvent || event.Type == scheduler.SensorFailEvent { - eventType := strings.TrimPrefix(event.Type.String(), fmt.Sprintf("%s_", scheduler.OperatorSensor)) - telemetry.NewCounter("jobrun_sensor_events_total", map[string]string{ - "project": event.Tenant.ProjectName().String(), - "namespace": event.Tenant.NamespaceName().String(), - "event_type": eventType, - }).Inc() - return - } - if event.Type == scheduler.TaskStartEvent || event.Type == scheduler.TaskRetryEvent || event.Type == scheduler.TaskSuccessEvent || event.Type == scheduler.TaskFailEvent { - eventType := strings.TrimPrefix(event.Type.String(), fmt.Sprintf("%s_", scheduler.OperatorTask)) - telemetry.NewCounter("jobrun_task_events_total", map[string]string{ - "project": event.Tenant.ProjectName().String(), - "namespace": event.Tenant.NamespaceName().String(), - "event_type": eventType, - "operator": event.OperatorName, - }).Inc() - return - } - if event.Type == scheduler.HookStartEvent || event.Type == scheduler.HookRetryEvent || event.Type == scheduler.HookSuccessEvent || event.Type == scheduler.HookFailEvent { - eventType := strings.TrimPrefix(event.Type.String(), fmt.Sprintf("%s_", scheduler.OperatorHook)) - telemetry.NewCounter("jobrun_hook_events_total", map[string]string{ - "project": event.Tenant.ProjectName().String(), - "namespace": event.Tenant.NamespaceName().String(), - "event_type": eventType, - "operator": event.OperatorName, - }).Inc() + switch event.Type { + case scheduler.SensorSuccessEvent, scheduler.SensorRetryEvent, scheduler.SensorFailEvent: + jobRunEventsMetric.WithLabelValues( + scheduler.OperatorSensor.String(), + event.Type.String(), + ).Inc() + case scheduler.TaskSuccessEvent, scheduler.TaskRetryEvent, scheduler.TaskFailEvent, scheduler.HookSuccessEvent, scheduler.HookRetryEvent, scheduler.HookFailEvent: + jobRunEventsMetric.WithLabelValues( + event.OperatorName, + event.Type.String(), + ).Inc() } } diff --git a/core/scheduler/service/replay_service.go b/core/scheduler/service/replay_service.go index 30ad0523e5..776623d958 100644 --- a/core/scheduler/service/replay_service.go +++ b/core/scheduler/service/replay_service.go @@ -5,24 +5,28 @@ import ( "github.com/google/uuid" "github.com/goto/salt/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/net/context" "github.com/goto/optimus/core/scheduler" "github.com/goto/optimus/core/tenant" "github.com/goto/optimus/internal/errors" "github.com/goto/optimus/internal/lib/cron" - "github.com/goto/optimus/internal/telemetry" "github.com/goto/optimus/internal/utils/filter" ) const ( getReplaysDayLimit = 30 // TODO: make it configurable via cli - metricJobReplay = "jobrun_replay_requests_total" - tenantReplayExecutionProjectConfigKey = "REPLAY_EXECUTION_PROJECT" ) +var jobReplayMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jobrun_replay_requests_total", + Help: "replay request count with status", +}, []string{"project", "namespace", "name", "status"}) + type SchedulerRunGetter interface { GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) } @@ -94,12 +98,11 @@ func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, return uuid.Nil, err } - telemetry.NewCounter(metricJobReplay, map[string]string{ - "project": tenant.ProjectName().String(), - "namespace": tenant.NamespaceName().String(), - "job": jobName.String(), - "status": replayReq.State().String(), - }).Inc() + jobReplayMetric.WithLabelValues(tenant.ProjectName().String(), + tenant.NamespaceName().String(), + jobName.String(), + replayReq.State().String(), + ).Inc() go r.executor.Execute(replayID, replayReq.Tenant(), jobName) diff --git a/core/scheduler/service/replay_worker.go b/core/scheduler/service/replay_worker.go index 9b725ff0ab..339b0e4995 100644 --- a/core/scheduler/service/replay_worker.go +++ b/core/scheduler/service/replay_worker.go @@ -13,7 +13,6 @@ import ( "github.com/goto/optimus/core/tenant" "github.com/goto/optimus/internal/errors" "github.com/goto/optimus/internal/lib/cron" - "github.com/goto/optimus/internal/telemetry" ) const ( @@ -52,6 +51,8 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN defer cancelFn() w.logger.Info("[ReplayID: %s] starting to execute replay", replayID) + project := jobTenant.ProjectName().String() + namespace := jobTenant.NamespaceName().String() jobCron, err := getJobCron(ctx, w.logger, w.jobRepo, jobTenant, jobName) if err != nil { @@ -59,7 +60,7 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN if err := w.replayRepo.UpdateReplayStatus(ctx, replayID, scheduler.ReplayStateFailed, err.Error()); err != nil { w.logger.Error("[ReplayID: %s] unable to update replay to failed: %s", replayID, err.Error()) } - raiseReplayMetric(jobTenant, jobName, scheduler.ReplayStateFailed) + jobReplayMetric.WithLabelValues(project, namespace, jobName.String(), scheduler.ReplayStateFailed.String()).Inc() return } @@ -76,7 +77,7 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN if err := w.replayRepo.UpdateReplayStatus(cleanupCtx, replayID, scheduler.ReplayStateFailed, errMessage); err != nil { w.logger.Error("[ReplayID: %s] unable to set replay status to 'failed': %s", replayID, err.Error()) } - raiseReplayMetric(jobTenant, jobName, scheduler.ReplayStateFailed) + jobReplayMetric.WithLabelValues(project, namespace, jobName.String(), scheduler.ReplayStateFailed.String()).Inc() } } @@ -256,12 +257,3 @@ func (*ReplayWorker) syncStatus(existingJobRuns, incomingJobRuns []*scheduler.Jo return updatedJobRuns } - -func raiseReplayMetric(t tenant.Tenant, jobName scheduler.JobName, state scheduler.ReplayState) { - telemetry.NewCounter(metricJobReplay, map[string]string{ - "project": t.ProjectName().String(), - "namespace": t.NamespaceName().String(), - "job": jobName.String(), - "status": state.String(), - }).Inc() -} diff --git a/core/tenant/handler/v1beta1/secret.go b/core/tenant/handler/v1beta1/secret.go index 7f4e7369f4..e49008574b 100644 --- a/core/tenant/handler/v1beta1/secret.go +++ b/core/tenant/handler/v1beta1/secret.go @@ -5,17 +5,17 @@ import ( "encoding/base64" "github.com/goto/salt/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/goto/optimus/core/tenant" "github.com/goto/optimus/core/tenant/dto" "github.com/goto/optimus/internal/errors" - "github.com/goto/optimus/internal/telemetry" pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1" ) const ( - metricSecretEvents = "secret_events_total" secretEventsStatusRegistered = "registered" secretEventsStatusUpdated = "updated" secretEventsStatusDeleted = "deleted" @@ -24,6 +24,10 @@ const ( secretEventsStatusDeleteFailed = "delete_failed" ) +var secretEventsMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "secret_events_total", +}, []string{"project", "namespace", "status"}) + type SecretService interface { Save(ctx context.Context, projName tenant.ProjectName, nsName string, pts *tenant.PlainTextSecret) error Update(ctx context.Context, projName tenant.ProjectName, nsName string, pts *tenant.PlainTextSecret) error @@ -157,9 +161,9 @@ func NewSecretsHandler(l log.Logger, secretService SecretService) *SecretHandler } func raiseSecretEventsMetric(projectName, namespaceName, state string) { - telemetry.NewCounter(metricSecretEvents, map[string]string{ - "project": projectName, - "namespace": namespaceName, - "status": state, - }).Inc() + secretEventsMetric.WithLabelValues( + projectName, + namespaceName, + state, + ).Inc() } diff --git a/ext/scheduler/airflow/airflow.go b/ext/scheduler/airflow/airflow.go index 0bfa1cac96..f7e7f3113d 100644 --- a/ext/scheduler/airflow/airflow.go +++ b/ext/scheduler/airflow/airflow.go @@ -14,6 +14,8 @@ import ( "github.com/goto/salt/log" "github.com/kushsharma/parallel" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "gocloud.dev/blob" "gocloud.dev/gcerrors" @@ -22,7 +24,6 @@ import ( "github.com/goto/optimus/core/tenant" "github.com/goto/optimus/internal/errors" "github.com/goto/optimus/internal/lib/cron" - "github.com/goto/optimus/internal/telemetry" ) //go:embed __lib.py @@ -46,12 +47,18 @@ const ( concurrentTicketPerSec = 50 concurrentLimit = 100 - metricJobUpload = "job_upload_total" - metricJobRemoval = "job_removal_total" metricJobStateSuccess = "success" metricJobStateFailed = "failed" ) +var jobUploadMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "job_upload_total", +}, []string{"project", "namespace", "status"}) + +var jobRemovalMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "job_removal_total", +}, []string{"project", "namespace", "status"}) + type Bucket interface { WriteAll(ctx context.Context, key string, p []byte, opts *blob.WriterOptions) error List(opts *blob.ListOptions) *blob.ListIterator @@ -129,8 +136,11 @@ func (s *Scheduler) DeployJobs(ctx context.Context, tenant tenant.Tenant, jobs [ } countDeploySucceed++ } - raiseSchedulerMetric(tenant, metricJobUpload, metricJobStateSuccess, countDeploySucceed) - raiseSchedulerMetric(tenant, metricJobUpload, metricJobStateFailed, countDeployFailed) + + projectName := project.Name().String() + namespace := tenant.NamespaceName().String() + jobUploadMetric.WithLabelValues(projectName, namespace, metricJobStateSuccess).Add(float64(countDeploySucceed)) + jobUploadMetric.WithLabelValues(projectName, namespace, metricJobStateFailed).Add(float64(countDeployFailed)) return multiError.ToErr() } @@ -193,8 +203,11 @@ func (s *Scheduler) DeleteJobs(ctx context.Context, t tenant.Tenant, jobNames [] } countDeleteJobsSucceed++ } - raiseSchedulerMetric(t, metricJobRemoval, metricJobStateSuccess, countDeleteJobsSucceed) - raiseSchedulerMetric(t, metricJobRemoval, metricJobStateFailed, countDeleteJobsFailed) + + projectName := t.ProjectName().String() + namespace := t.NamespaceName().String() + jobRemovalMetric.WithLabelValues(projectName, namespace, metricJobStateSuccess).Add(float64(countDeleteJobsSucceed)) + jobRemovalMetric.WithLabelValues(projectName, namespace, metricJobStateFailed).Add(float64(countDeleteJobsFailed)) err = deleteDirectoryIfEmpty(ctx, t.NamespaceName().String(), bucket) if err != nil { @@ -439,11 +452,3 @@ func NewScheduler(l log.Logger, bucketFac BucketFactory, client Client, compiler secretGetter: secretGetter, } } - -func raiseSchedulerMetric(jobTenant tenant.Tenant, metricName, status string, metricValue int) { - telemetry.NewCounter(metricName, map[string]string{ - "project": jobTenant.ProjectName().String(), - "namespace": jobTenant.NamespaceName().String(), - "status": status, - }).Add(float64(metricValue)) -} diff --git a/ext/store/bigquery/view.go b/ext/store/bigquery/view.go index 576d798312..5f9d87d1d4 100644 --- a/ext/store/bigquery/view.go +++ b/ext/store/bigquery/view.go @@ -22,7 +22,6 @@ func (v ViewHandle) Create(ctx context.Context, res *resource.Resource) error { if err != nil { return err } - meta, err := getMetadataToCreate(view.Description, view.ExtraConfig, res.Metadata().Labels) if err != nil { return errors.AddErrContext(err, EntityView, "failed to get metadata to update for "+res.FullName()) diff --git a/go.mod b/go.mod index 4be713b1d5..1edcdd53f4 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,6 @@ require ( github.com/mitchellh/mapstructure v1.4.3 github.com/olekukonko/tablewriter v0.0.5 github.com/prometheus/client_golang v1.11.0 - github.com/prometheus/common v0.30.0 github.com/robfig/cron/v3 v3.0.1 github.com/schollz/progressbar/v3 v3.8.5 github.com/segmentio/kafka-go v0.4.39 @@ -140,6 +139,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.30.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect diff --git a/internal/telemetry/prometheus.go b/internal/telemetry/prometheus.go index ec30f5fbeb..32405582ee 100644 --- a/internal/telemetry/prometheus.go +++ b/internal/telemetry/prometheus.go @@ -1,88 +1,14 @@ package telemetry import ( - "sort" - "sync" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/push" - "github.com/prometheus/common/expfmt" -) - -var ( - counterMetricMap = map[string]prometheus.Counter{} - counterMetricMutex = sync.Mutex{} - - gaugeMetricMap = map[string]prometheus.Gauge{} - gaugeMetricMutex = sync.Mutex{} - - MetricServer string - - panicMetric = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "panics_recovered", - }, []string{"entity", "msg"}) ) -const metricsPushJob = "optimus_push" +var panicMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "panics_recovered", +}, []string{"entity", "msg"}) func LogPanic(entity, message string) { panicMetric.WithLabelValues(entity, message).Inc() } - -func getKey(metric string, labels map[string]string) string { - eventMetricKey := metric - keys := make([]string, 0, len(labels)) - for k := range labels { - keys = append(keys, k) - } - sort.Strings(keys) - for _, key := range keys { - eventMetricKey += "/" + key + ":" + labels[key] - } - return eventMetricKey -} - -func NewCounter(metric string, labels map[string]string) prometheus.Counter { - metricKey := getKey(metric, labels) - - counterMetricMutex.Lock() - defer counterMetricMutex.Unlock() - - if existingMetric, ok := counterMetricMap[metricKey]; ok { - return existingMetric - } - newMetric := promauto.NewCounter(prometheus.CounterOpts{Name: metric, ConstLabels: labels}) - counterMetricMap[metricKey] = newMetric - return newMetric -} - -func SetGaugeViaPush(name string, labels map[string]string, val float64) error { - if MetricServer == "" { - return nil - } - metric := prometheus.NewGauge(prometheus.GaugeOpts{ - Name: name, - ConstLabels: labels, - }) - metric.Set(val) - - return push.New(MetricServer, metricsPushJob). - Format(expfmt.FmtText). - Collector(metric). - Push() -} - -func NewGauge(metric string, labels map[string]string) prometheus.Gauge { - metricKey := getKey(metric, labels) - - gaugeMetricMutex.Lock() - defer gaugeMetricMutex.Unlock() - - if existingMetric, ok := gaugeMetricMap[metricKey]; ok { - return existingMetric - } - newMetric := promauto.NewGauge(prometheus.GaugeOpts{Name: metric, ConstLabels: labels}) - gaugeMetricMap[metricKey] = newMetric - return newMetric -} diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 0ab0df6198..02b1821623 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -47,8 +47,6 @@ func Init(l log.Logger, conf config.TelemetryConfig) (func(), error) { otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) } - MetricServer = conf.MetricServerAddr - var metricServer *http.Server if conf.ProfileAddr != "" { l.Debug("enabling profile metrics", "addr", conf.ProfileAddr)