Skip to content

Commit

Permalink
feat: propagate context to the metrics package (kyverno#5479)
Browse files Browse the repository at this point in the history
Signed-off-by: Charles-Edouard Brétéché <[email protected]>

Signed-off-by: Charles-Edouard Brétéché <[email protected]>
  • Loading branch information
eddycharly authored Nov 28, 2022
1 parent 08447c1 commit dfded5c
Show file tree
Hide file tree
Showing 20 changed files with 129 additions and 116 deletions.
5 changes: 3 additions & 2 deletions cmd/cleanup-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ const (
resyncPeriod = 15 * time.Minute
)

func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
func setupMetrics(ctx context.Context, logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
logger = logger.WithName("metrics")
logger.Info("setup metrics...", "otel", otel, "port", metricsPort, "collector", otelCollector, "creds", transportCreds)
metricsConfiguration := internal.GetMetricsConfiguration(logger, kubeClient)
metricsAddr := ":" + metricsPort
metricsConfig, metricsServerMux, metricsPusher, err := metrics.InitMetrics(
ctx,
disableMetricsExport,
otel,
metricsAddr,
Expand Down Expand Up @@ -105,7 +106,7 @@ func main() {
// create raw client
rawClient := internal.CreateKubernetesClient(logger)
// setup metrics
metricsConfig, metricsShutdown, err := setupMetrics(logger, rawClient)
metricsConfig, metricsShutdown, err := setupMetrics(ctx, logger, rawClient)
if err != nil {
logger.Error(err, "failed to setup metrics")
os.Exit(1)
Expand Down
11 changes: 6 additions & 5 deletions cmd/kyverno/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ func parseFlags(config internal.Configuration) {
flag.Parse()
}

func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
func setupMetrics(ctx context.Context, logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
logger = logger.WithName("metrics")
logger.Info("setup metrics...", "otel", otel, "port", metricsPort, "collector", otelCollector, "creds", transportCreds)
metricsConfiguration := internal.GetMetricsConfiguration(logger, kubeClient)
metricsAddr := ":" + metricsPort
metricsConfig, metricsServerMux, metricsPusher, err := metrics.InitMetrics(
ctx,
disableMetricsExport,
otel,
metricsAddr,
Expand Down Expand Up @@ -425,18 +426,18 @@ func main() {
internal.SetupProfiling(logger)
// create raw client
rawClient := internal.CreateKubernetesClient(logger)
// setup signals
signalCtx, signalCancel := internal.SetupSignals(logger)
defer signalCancel()
// setup metrics
metricsConfig, metricsShutdown, err := setupMetrics(logger, rawClient)
metricsConfig, metricsShutdown, err := setupMetrics(signalCtx, logger, rawClient)
if err != nil {
logger.Error(err, "failed to setup metrics")
os.Exit(1)
}
if metricsShutdown != nil {
defer metricsShutdown()
}
// setup signals
signalCtx, signalCancel := internal.SetupSignals(logger)
defer signalCancel()
// create instrumented clients
kubeClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
leaderElectionClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
Expand Down
26 changes: 14 additions & 12 deletions pkg/controllers/metrics/policy/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package policy

import (
"context"

kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
"github.com/kyverno/kyverno/pkg/metrics"
Expand All @@ -27,17 +29,17 @@ func NewController(metricsConfig *metrics.MetricsConfig, cpolInformer kyvernov1i
func (c *controller) addPolicy(obj interface{}) {
p := obj.(*kyvernov1.ClusterPolicy)
// register kyverno_policy_rule_info_total metric concurrently
go c.registerPolicyRuleInfoMetricAddPolicy(logger, p)
go c.registerPolicyRuleInfoMetricAddPolicy(context.TODO(), logger, p)
// register kyverno_policy_changes_total metric concurrently
go c.registerPolicyChangesMetricAddPolicy(logger, p)
go c.registerPolicyChangesMetricAddPolicy(context.TODO(), logger, p)
}

func (c *controller) updatePolicy(old, cur interface{}) {
oldP, curP := old.(*kyvernov1.ClusterPolicy), cur.(*kyvernov1.ClusterPolicy)
// register kyverno_policy_rule_info_total metric concurrently
go c.registerPolicyRuleInfoMetricUpdatePolicy(logger, oldP, curP)
go c.registerPolicyRuleInfoMetricUpdatePolicy(context.TODO(), logger, oldP, curP)
// register kyverno_policy_changes_total metric concurrently
go c.registerPolicyChangesMetricUpdatePolicy(logger, oldP, curP)
go c.registerPolicyChangesMetricUpdatePolicy(context.TODO(), logger, oldP, curP)
}

func (c *controller) deletePolicy(obj interface{}) {
Expand All @@ -47,25 +49,25 @@ func (c *controller) deletePolicy(obj interface{}) {
return
}
// register kyverno_policy_rule_info_total metric concurrently
go c.registerPolicyRuleInfoMetricDeletePolicy(logger, p)
go c.registerPolicyRuleInfoMetricDeletePolicy(context.TODO(), logger, p)
// register kyverno_policy_changes_total metric concurrently
go c.registerPolicyChangesMetricDeletePolicy(logger, p)
go c.registerPolicyChangesMetricDeletePolicy(context.TODO(), logger, p)
}

func (c *controller) addNsPolicy(obj interface{}) {
p := obj.(*kyvernov1.Policy)
// register kyverno_policy_rule_info_total metric concurrently
go c.registerPolicyRuleInfoMetricAddPolicy(logger, p)
go c.registerPolicyRuleInfoMetricAddPolicy(context.TODO(), logger, p)
// register kyverno_policy_changes_total metric concurrently
go c.registerPolicyChangesMetricAddPolicy(logger, p)
go c.registerPolicyChangesMetricAddPolicy(context.TODO(), logger, p)
}

func (c *controller) updateNsPolicy(old, cur interface{}) {
oldP, curP := old.(*kyvernov1.Policy), cur.(*kyvernov1.Policy)
// register kyverno_policy_rule_info_total metric concurrently
go c.registerPolicyRuleInfoMetricUpdatePolicy(logger, oldP, curP)
go c.registerPolicyRuleInfoMetricUpdatePolicy(context.TODO(), logger, oldP, curP)
// register kyverno_policy_changes_total metric concurrently
go c.registerPolicyChangesMetricUpdatePolicy(logger, oldP, curP)
go c.registerPolicyChangesMetricUpdatePolicy(context.TODO(), logger, oldP, curP)
}

func (c *controller) deleteNsPolicy(obj interface{}) {
Expand All @@ -75,7 +77,7 @@ func (c *controller) deleteNsPolicy(obj interface{}) {
return
}
// register kyverno_policy_rule_info_total metric concurrently
go c.registerPolicyRuleInfoMetricDeletePolicy(logger, p)
go c.registerPolicyRuleInfoMetricDeletePolicy(context.TODO(), logger, p)
// register kyverno_policy_changes_total metric concurrently
go c.registerPolicyChangesMetricDeletePolicy(logger, p)
go c.registerPolicyChangesMetricDeletePolicy(context.TODO(), logger, p)
}
29 changes: 15 additions & 14 deletions pkg/controllers/metrics/policy/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package policy

import (
"context"
"reflect"

"github.com/go-logr/logr"
Expand All @@ -9,61 +10,61 @@ import (
policyRuleInfoMetric "github.com/kyverno/kyverno/pkg/metrics/policyruleinfo"
)

func (pc *controller) registerPolicyRuleInfoMetricAddPolicy(logger logr.Logger, p kyvernov1.PolicyInterface) {
err := policyRuleInfoMetric.AddPolicy(pc.metricsConfig, p)
func (pc *controller) registerPolicyRuleInfoMetricAddPolicy(ctx context.Context, logger logr.Logger, p kyvernov1.PolicyInterface) {
err := policyRuleInfoMetric.AddPolicy(ctx, pc.metricsConfig, p)
if err != nil {
logger.Error(err, "error occurred while registering kyverno_policy_rule_info_total metrics for the above policy's creation", "name", p.GetName())
}
}

func (pc *controller) registerPolicyRuleInfoMetricUpdatePolicy(logger logr.Logger, oldP, curP kyvernov1.PolicyInterface) {
func (pc *controller) registerPolicyRuleInfoMetricUpdatePolicy(ctx context.Context, logger logr.Logger, oldP, curP kyvernov1.PolicyInterface) {
// removing the old rules associated metrics
err := policyRuleInfoMetric.RemovePolicy(pc.metricsConfig, oldP)
err := policyRuleInfoMetric.RemovePolicy(ctx, pc.metricsConfig, oldP)
if err != nil {
logger.Error(err, "error occurred while registering kyverno_policy_rule_info_total metrics for the above policy's updation", "name", oldP.GetName())
}
// adding the new rules associated metrics
err = policyRuleInfoMetric.AddPolicy(pc.metricsConfig, curP)
err = policyRuleInfoMetric.AddPolicy(ctx, pc.metricsConfig, curP)
if err != nil {
logger.Error(err, "error occurred while registering kyverno_policy_rule_info_total metrics for the above policy's updation", "name", oldP.GetName())
}
}

func (pc *controller) registerPolicyRuleInfoMetricDeletePolicy(logger logr.Logger, p kyvernov1.PolicyInterface) {
err := policyRuleInfoMetric.RemovePolicy(pc.metricsConfig, p)
func (pc *controller) registerPolicyRuleInfoMetricDeletePolicy(ctx context.Context, logger logr.Logger, p kyvernov1.PolicyInterface) {
err := policyRuleInfoMetric.RemovePolicy(ctx, pc.metricsConfig, p)
if err != nil {
logger.Error(err, "error occurred while registering kyverno_policy_rule_info_total metrics for the above policy's deletion", "name", p.GetName())
}
}

func (pc *controller) registerPolicyChangesMetricAddPolicy(logger logr.Logger, p kyvernov1.PolicyInterface) {
err := policyChangesMetric.RegisterPolicy(pc.metricsConfig, p, policyChangesMetric.PolicyCreated)
func (pc *controller) registerPolicyChangesMetricAddPolicy(ctx context.Context, logger logr.Logger, p kyvernov1.PolicyInterface) {
err := policyChangesMetric.RegisterPolicy(ctx, pc.metricsConfig, p, policyChangesMetric.PolicyCreated)
if err != nil {
logger.Error(err, "error occurred while registering kyverno_policy_changes_total metrics for the above policy's creation", "name", p.GetName())
}
}

func (pc *controller) registerPolicyChangesMetricUpdatePolicy(logger logr.Logger, oldP, curP kyvernov1.PolicyInterface) {
func (pc *controller) registerPolicyChangesMetricUpdatePolicy(ctx context.Context, logger logr.Logger, oldP, curP kyvernov1.PolicyInterface) {
oldSpec := oldP.GetSpec()
curSpec := curP.GetSpec()
if reflect.DeepEqual(oldSpec, curSpec) {
return
}
err := policyChangesMetric.RegisterPolicy(pc.metricsConfig, oldP, policyChangesMetric.PolicyUpdated)
err := policyChangesMetric.RegisterPolicy(ctx, pc.metricsConfig, oldP, policyChangesMetric.PolicyUpdated)
if err != nil {
logger.Error(err, "error occurred while registering kyverno_policy_changes_total metrics for the above policy's updation", "name", oldP.GetName())
}
// curP will require a new kyverno_policy_changes_total metric if the above update involved change in the following fields:
if curSpec.BackgroundProcessingEnabled() != oldSpec.BackgroundProcessingEnabled() || curSpec.ValidationFailureAction.Enforce() != oldSpec.ValidationFailureAction.Enforce() {
err = policyChangesMetric.RegisterPolicy(pc.metricsConfig, curP, policyChangesMetric.PolicyUpdated)
err = policyChangesMetric.RegisterPolicy(ctx, pc.metricsConfig, curP, policyChangesMetric.PolicyUpdated)
if err != nil {
logger.Error(err, "error occurred while registering kyverno_policy_changes_total metrics for the above policy's updation", "name", curP.GetName())
}
}
}

func (pc *controller) registerPolicyChangesMetricDeletePolicy(logger logr.Logger, p kyvernov1.PolicyInterface) {
err := policyChangesMetric.RegisterPolicy(pc.metricsConfig, p, policyChangesMetric.PolicyDeleted)
func (pc *controller) registerPolicyChangesMetricDeletePolicy(ctx context.Context, logger logr.Logger, p kyvernov1.PolicyInterface) {
err := policyChangesMetric.RegisterPolicy(ctx, pc.metricsConfig, p, policyChangesMetric.PolicyDeleted)
if err != nil {
logger.Error(err, "error occurred while registering kyverno_policy_changes_total metrics for the above policy's deletion", "name", p.GetName())
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/metrics/admissionrequests/admissionRequests.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package admissionrequests

import (
"context"
"fmt"
"strings"

Expand All @@ -9,7 +10,7 @@ import (
admissionv1 "k8s.io/api/admission/v1"
)

func registerAdmissionRequestsMetric(m *metrics.MetricsConfig, resourceKind, resourceNamespace string, resourceRequestOperation metrics.ResourceRequestOperation, allowed bool) {
func registerAdmissionRequestsMetric(ctx context.Context, m *metrics.MetricsConfig, resourceKind, resourceNamespace string, resourceRequestOperation metrics.ResourceRequestOperation, allowed bool) {
includeNamespaces, excludeNamespaces := m.Config.GetIncludeNamespaces(), m.Config.GetExcludeNamespaces()
if (resourceNamespace != "" && resourceNamespace != "-") && utils.ContainsString(excludeNamespaces, resourceNamespace) {
m.Log.V(2).Info(fmt.Sprintf("Skipping the registration of kyverno_admission_requests_total metric as the operation belongs to the namespace '%s' which is one of 'namespaces.exclude' %+v in values.yaml", resourceNamespace, excludeNamespaces))
Expand All @@ -19,10 +20,10 @@ func registerAdmissionRequestsMetric(m *metrics.MetricsConfig, resourceKind, res
m.Log.V(2).Info(fmt.Sprintf("Skipping the registration of kyverno_admission_requests_total metric as the operation belongs to the namespace '%s' which is not one of 'namespaces.include' %+v in values.yaml", resourceNamespace, includeNamespaces))
return
}
m.RecordAdmissionRequests(resourceKind, resourceNamespace, resourceRequestOperation, allowed)
m.RecordAdmissionRequests(ctx, resourceKind, resourceNamespace, resourceRequestOperation, allowed)
}

func Process(m *metrics.MetricsConfig, request *admissionv1.AdmissionRequest, response *admissionv1.AdmissionResponse) {
func Process(ctx context.Context, m *metrics.MetricsConfig, request *admissionv1.AdmissionRequest, response *admissionv1.AdmissionResponse) {
op := strings.ToLower(string(request.Operation))
registerAdmissionRequestsMetric(m, request.Kind.Kind, request.Namespace, metrics.ResourceRequestOperation(op), response.Allowed)
registerAdmissionRequestsMetric(ctx, m, request.Kind.Kind, request.Namespace, metrics.ResourceRequestOperation(op), response.Allowed)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package admissionreviewduration

import (
"context"
"fmt"
"strings"

Expand All @@ -9,7 +10,7 @@ import (
admissionv1 "k8s.io/api/admission/v1"
)

func registerAdmissionReviewDurationMetric(m *metrics.MetricsConfig, resourceKind, resourceNamespace string, resourceRequestOperation metrics.ResourceRequestOperation, admissionRequestLatency float64, allowed bool) {
func registerAdmissionReviewDurationMetric(ctx context.Context, m *metrics.MetricsConfig, resourceKind, resourceNamespace string, resourceRequestOperation metrics.ResourceRequestOperation, admissionRequestLatency float64, allowed bool) {
includeNamespaces, excludeNamespaces := m.Config.GetIncludeNamespaces(), m.Config.GetExcludeNamespaces()
if (resourceNamespace != "" && resourceNamespace != "-") && utils.ContainsString(excludeNamespaces, resourceNamespace) {
m.Log.V(2).Info(fmt.Sprintf("Skipping the registration of kyverno_admission_review_duration_seconds metric as the operation belongs to the namespace '%s' which is one of 'namespaces.exclude' %+v in values.yaml", resourceNamespace, excludeNamespaces))
Expand All @@ -19,11 +20,11 @@ func registerAdmissionReviewDurationMetric(m *metrics.MetricsConfig, resourceKin
m.Log.V(2).Info(fmt.Sprintf("Skipping the registration of kyverno_admission_review_duration_seconds metric as the operation belongs to the namespace '%s' which is not one of 'namespaces.include' %+v in values.yaml", resourceNamespace, includeNamespaces))
return
}
m.RecordAdmissionReviewDuration(resourceKind, resourceNamespace, string(resourceRequestOperation), admissionRequestLatency, allowed)
m.RecordAdmissionReviewDuration(ctx, resourceKind, resourceNamespace, string(resourceRequestOperation), admissionRequestLatency, allowed)
}

func Process(m *metrics.MetricsConfig, request *admissionv1.AdmissionRequest, response *admissionv1.AdmissionResponse, latency int64) {
func Process(ctx context.Context, m *metrics.MetricsConfig, request *admissionv1.AdmissionRequest, response *admissionv1.AdmissionResponse, latency int64) {
op := strings.ToLower(string(request.Operation))
admissionReviewLatencyDurationInSeconds := float64(latency) / float64(1000*1000*1000)
registerAdmissionReviewDurationMetric(m, request.Kind.Kind, request.Namespace, metrics.ResourceRequestOperation(op), admissionReviewLatencyDurationInSeconds, response.Allowed)
registerAdmissionReviewDurationMetric(ctx, m, request.Kind.Kind, request.Namespace, metrics.ResourceRequestOperation(op), admissionReviewLatencyDurationInSeconds, response.Allowed)
}
8 changes: 7 additions & 1 deletion pkg/metrics/client.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package metrics

import "context"

type Recorder interface {
Record(clientQueryOperation ClientQueryOperation)
}
Expand Down Expand Up @@ -29,5 +31,9 @@ func ClusteredClientQueryRecorder(m MetricsConfigManager, kind string, client Cl
}

func (r *clientQueryRecorder) Record(clientQueryOperation ClientQueryOperation) {
r.manager.RecordClientQueries(clientQueryOperation, r.client, r.kind, r.ns)
r.RecordWithContext(context.TODO(), clientQueryOperation)
}

func (r *clientQueryRecorder) RecordWithContext(ctx context.Context, clientQueryOperation ClientQueryOperation) {
r.manager.RecordClientQueries(ctx, clientQueryOperation, r.client, r.kind, r.ns)
}
5 changes: 4 additions & 1 deletion pkg/metrics/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"net/http"

"github.com/go-logr/logr"
Expand All @@ -10,6 +11,7 @@ import (
)

func InitMetrics(
ctx context.Context,
disableMetricsExport bool,
otel string,
metricsAddr string,
Expand Down Expand Up @@ -41,6 +43,7 @@ func InitMetrics(

endpoint := otelCollector + metricsAddr
pusher, err = NewOTLPGRPCConfig(
ctx,
endpoint,
transportCreds,
kubeClient,
Expand All @@ -51,7 +54,7 @@ func InitMetrics(
}
} else if otel == "prometheus" {
// Prometheus Server will serve metrics on metrics-port
metricsServerMux, err = NewPrometheusConfig(log)
metricsServerMux, err = NewPrometheusConfig(ctx, log)

if err != nil {
return nil, nil, pusher, err
Expand Down
Loading

0 comments on commit dfded5c

Please sign in to comment.