Skip to content

Commit

Permalink
Merge branch 'main' into issue-4221
Browse files Browse the repository at this point in the history
  • Loading branch information
ansalamdaniel authored Nov 24, 2022
2 parents 8add394 + 3d7e0e7 commit 6a20ab6
Show file tree
Hide file tree
Showing 36 changed files with 728 additions and 536 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/conformance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ on:
- 'main'
- 'release*'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
run-conformance:
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ on:

permissions: read-all

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
e2e-test:
strategy:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/image-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ permissions:
packages: write
id-token: write

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
pre-checks:
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ permissions:
packages: write
id-token: write

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
push-init-kyverno:
uses: ./.github/workflows/reuse.yaml
Expand Down
76 changes: 13 additions & 63 deletions cmd/cleanup-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"flag"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/go-logr/logr"
Expand All @@ -20,13 +18,9 @@ import (
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

var (
kubeconfig string
clientRateLimitQPS float64
clientRateLimitBurst int
otel string
otelCollector string
metricsPort string
Expand All @@ -40,9 +34,6 @@ const (

func parseFlags(config internal.Configuration) {
internal.InitFlags(config)
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 20, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 50, "Configure the maximum burst for throttle. Uses the client default if zero.")
flag.StringVar(&otel, "otelConfig", "prometheus", "Set this flag to 'grpc', to enable exporting metrics to an Opentelemetry Collector. The default collector is set to \"prometheus\"")
flag.StringVar(&otelCollector, "otelCollector", "opentelemetrycollector.kyverno.svc.cluster.local", "Set this flag to the OpenTelemetry Collector Service Address. Kyverno will try to connect to this on the metrics port.")
flag.StringVar(&transportCreds, "transportCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Metrics Client. If empty string is set, means an insecure connection will be used")
Expand All @@ -51,46 +42,6 @@ func parseFlags(config internal.Configuration) {
flag.Parse()
}

func createKubeClients(logger logr.Logger) (*rest.Config, kubernetes.Interface, error) {
logger = logger.WithName("kube-clients")
logger.Info("create kube clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
if err != nil {
return nil, nil, err
}
kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, nil, err
}
return clientConfig, kubeClient, nil
}

func createInstrumentedClients(ctx context.Context, logger logr.Logger, clientConfig *rest.Config, metricsConfig *metrics.MetricsConfig) (kubernetes.Interface, dclient.Interface, error) {
logger = logger.WithName("instrumented-clients")
logger.Info("create instrumented clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
kubeClient, err := kubeclient.NewForConfig(
clientConfig,
kubeclient.WithMetrics(metricsConfig, metrics.KubeClient),
kubeclient.WithTracing(),
)
if err != nil {
return nil, nil, err
}
dynamicClient, err := dynamicclient.NewForConfig(
clientConfig,
dynamicclient.WithMetrics(metricsConfig, metrics.KubeClient),
dynamicclient.WithTracing(),
)
if err != nil {
return nil, nil, err
}
dClient, err := dclient.NewClient(ctx, dynamicClient, kubeClient, resyncPeriod)
if err != nil {
return nil, nil, err
}
return kubeClient, dClient, nil
}

func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
logger = logger.WithName("metrics")
logger.Info("setup metrics...", "otel", otel, "port", metricsPort, "collector", otelCollector, "creds", transportCreds)
Expand Down Expand Up @@ -130,13 +81,13 @@ func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics
return metricsConfig, cancel, nil
}

func setupSignals() (context.Context, context.CancelFunc) {
return signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
}

func main() {
// config
appConfig := internal.NewConfiguration(internal.WithProfiling(), internal.WithTracing())
appConfig := internal.NewConfiguration(
internal.WithProfiling(),
internal.WithTracing(),
internal.WithKubeconfig(),
)
// parse flags
parseFlags(appConfig)
// setup logger
Expand All @@ -148,13 +99,10 @@ func main() {
internal.ShowVersion(logger)
// start profiling
internal.SetupProfiling(logger)
// create client config and kube clients
clientConfig, rawClient, err := createKubeClients(logger)
if err != nil {
os.Exit(1)
}
// create raw client
rawClient := internal.CreateKubernetesClient(logger)
// setup signals
signalCtx, signalCancel := setupSignals()
signalCtx, signalCancel := internal.SetupSignals(logger)
defer signalCancel()
// setup metrics
metricsConfig, metricsShutdown, err := setupMetrics(logger, rawClient)
Expand All @@ -166,14 +114,16 @@ func main() {
defer metricsShutdown()
}
// create instrumented clients
kubeClient, dynamicClient, err := createInstrumentedClients(signalCtx, logger, clientConfig, metricsConfig)
kubeClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
dynamicClient := internal.CreateDynamicClient(logger, dynamicclient.WithMetrics(metricsConfig, metrics.KyvernoClient), dynamicclient.WithTracing())
dClient, err := dclient.NewClient(signalCtx, dynamicClient, kubeClient, 15*time.Minute)
if err != nil {
logger.Error(err, "failed to create instrument clients")
logger.Error(err, "failed to create dynamic client")
os.Exit(1)
}
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
policyHandlers := NewHandlers(
dynamicClient,
dClient,
)
secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister()
// start informers and wait for cache sync
Expand Down
8 changes: 3 additions & 5 deletions cmd/cleanup-controller/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ func NewServer(
mux.HandlerFunc(
"POST",
ValidatingWebhookServicePath,
http.HandlerFunc(
handlers.AdmissionHandler(policyHandlers.Validate).
WithAdmission(logger.Logger.WithName("validate")).
WithTrace(),
),
handlers.FromAdmissionFunc("VALIDATE", policyHandlers.Validate).
WithAdmission(logger.Logger.WithName("validate")).
ToHandlerFunc(),
)
return &server{
server: &http.Server{
Expand Down
57 changes: 12 additions & 45 deletions cmd/initContainer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"encoding/json"
"flag"
"os"
"os/signal"
"sync"
"syscall"
"time"

kyvernov1beta1 "github.com/kyverno/kyverno/api/kyverno/v1beta1"
Expand All @@ -27,16 +25,9 @@ import (
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

var (
kubeconfig string
clientRateLimitQPS float64
clientRateLimitBurst int
)

const (
policyReportKind string = "PolicyReport"
clusterPolicyReportKind string = "ClusterPolicyReport"
Expand All @@ -45,15 +36,14 @@ const (

func parseFlags(config internal.Configuration) {
internal.InitFlags(config)
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 0, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 0, "Configure the maximum burst for throttle. Uses the client default if zero.")
flag.Parse()
}

func main() {
// config
appConfig := internal.NewConfiguration()
appConfig := internal.NewConfiguration(
internal.WithKubeconfig(),
)
// parse flags
parseFlags(appConfig)
// setup logger
Expand All @@ -64,29 +54,12 @@ func main() {
// show version
internal.ShowVersion(logger)
// os signal handler
signalCtx, signalCancel := signal.NotifyContext(logging.Background(), os.Interrupt, syscall.SIGTERM)
signalCtx, signalCancel := internal.SetupSignals(logger)
defer signalCancel()

stopCh := signalCtx.Done()

// create client config
clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
if err != nil {
logger.Error(err, "Failed to build kubeconfig")
os.Exit(1)
}

kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
logger.Error(err, "Failed to create kubernetes client")
os.Exit(1)
}

dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
logger.Error(err, "Failed to create dynamic client")
os.Exit(1)
}
kubeClient := internal.CreateKubernetesClient(logger)
dynamicClient := internal.CreateDynamicClient(logger)
kyvernoClient := internal.CreateKyvernoClient(logger)

// DYNAMIC CLIENT
// - client for all registered resources
Expand All @@ -101,12 +74,6 @@ func main() {
os.Exit(1)
}

pclient, err := kyvernoclient.NewForConfig(clientConfig)
if err != nil {
logger.Error(err, "Failed to create client")
os.Exit(1)
}

// Exit for unsupported version of kubernetes cluster
if !utils.HigherThanKubernetesVersion(kubeClient.Discovery(), logging.GlobalLogger(), 1, 16, 0) {
os.Exit(1)
Expand All @@ -120,7 +87,7 @@ func main() {

go func() {
defer signalCancel()
<-stopCh
<-signalCtx.Done()
}()

done := make(chan struct{})
Expand Down Expand Up @@ -152,13 +119,13 @@ func main() {
}

// use pipeline to pass request to cleanup resources
in := gen(done, stopCh, requests...)
in := gen(done, signalCtx.Done(), requests...)
// process requests
// processing routine count : 2
p1 := process(client, pclient, done, stopCh, in)
p2 := process(client, pclient, done, stopCh, in)
p1 := process(client, kyvernoClient, done, signalCtx.Done(), in)
p2 := process(client, kyvernoClient, done, signalCtx.Done(), in)
// merge results from processing routines
for err := range merge(done, stopCh, p1, p2) {
for err := range merge(done, signalCtx.Done(), p1, p2) {
if err != nil {
failure = true
logging.Error(err, "failed to cleanup resource")
Expand Down
1 change: 1 addition & 0 deletions cmd/internal/maxprocs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

func SetupMaxProcs(logger logr.Logger) func() {
logger = logger.WithName("maxprocs")
logger.Info("setup maxprocs...")
undo, err := maxprocs.Set(
maxprocs.Logger(
func(format string, args ...interface{}) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/profiling.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

func SetupProfiling(logger logr.Logger) {
logger = logger.WithName("profiling").WithValues("enabled", profilingEnabled, "address", profilingAddress, "port", profilingPort)
logger.Info("start profiling...")
if profilingEnabled {
logger.Info("setup profiling...")
profiling.Start(logger, net.JoinHostPort(profilingAddress, profilingPort))
}
}
18 changes: 18 additions & 0 deletions cmd/internal/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package internal

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/go-logr/logr"
)

var Context = context.Background()

func SetupSignals(logger logr.Logger) (context.Context, context.CancelFunc) {
logger = logger.WithName("signals")
logger.Info("setup signals...")
return signal.NotifyContext(Context, os.Interrupt, syscall.SIGTERM)
}
2 changes: 1 addition & 1 deletion cmd/internal/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

func SetupTracing(logger logr.Logger, name string, kubeClient kubernetes.Interface) context.CancelFunc {
logger = logger.WithName("tracing").WithValues("enabled", tracingEnabled, "address", tracingAddress, "port", tracingPort, "creds", tracingCreds)
logger.Info("setup tracing...")
if tracingEnabled {
logger.Info("setup tracing...")
shutdown, err := tracing.NewTraceConfig(
logger,
name,
Expand Down
Loading

0 comments on commit 6a20ab6

Please sign in to comment.