From aaa80619b05d47583675ee957dcb280ec01dd71a Mon Sep 17 00:00:00 2001 From: Matt Prahl Date: Mon, 9 Sep 2024 16:41:19 -0400 Subject: [PATCH] Index additional printer columns for Gatekeeper constraints (#299) * Use type assertions in GenericInformer Since the dynamic client is always used, the object will always be of type *unstructured.Unstructured, so no need to do anything but use type assertions. Signed-off-by: mprahl * Index additional printer columns for Gatekeeper constraints Gatekeeper constraints are CRD based but the CRDs are generated based on user defined ConstraintTemplate objects. Because of this, indexing the constraint's `spec.enforcementAction` and `status.totalViolations` is a bit difficult with the current setup. This makes it so that the CRD's additionalPrinterColumns are automatically indexed. The scope of this change is limited to the constraints.gatekeeper.sh API group, but removing that check in the the `GenericResourceBuilder` function will enable it for all resources that don't have a custom transform. Note that the names from the additionalPrinterColumns in the CRD are converted to lowerCamelCase style when set as node properties. The separators are spaces and dashes (e.g. "enforcement action" and "enforcement-action" become "enforcementAction). Duplicate node properties are ignored, with a priority on the default generic property names. As part of this change, it made sense to also make `syncInformers` event driven so that it only gets run when there is a likely relevent change in a CRD (i.e. added, deleted, or the spec changed). This makes the indexing of new custom resources nearly instant while also sharing the CRD informer for keeping an up to date mapping of GVRs to additional printer columns. Lastly, main.go was refactored a bit to wait for the two main goroutines to end before exiting. This allows for some clean up when the interrupt signal is sent to the process. More refactoring to let every goroutine clean up should be considered. Relates: https://issues.redhat.com/browse/ACM-13278 https://issues.redhat.com/browse/ACM-12572 Signed-off-by: mprahl * Add _isExternal field to Gatekeeper constraints Signed-off-by: mprahl --------- Signed-off-by: mprahl --- config.json | 1 - main.go | 49 ++- pkg/config/config.go | 70 ++-- pkg/informer/informer.go | 40 +- pkg/informer/informer_test.go | 23 +- pkg/informer/runInformers.go | 516 ++++++++++++++++++++++-- pkg/informer/runInformers_test.go | 184 ++++++++- pkg/send/sender.go | 10 +- pkg/transforms/genericResourceConfig.go | 57 ++- pkg/transforms/genericresource.go | 81 ++-- pkg/transforms/policy.go | 20 +- pkg/transforms/transformer.go | 18 +- pkg/transforms/transformer_test.go | 45 ++- 13 files changed, 945 insertions(+), 169 deletions(-) diff --git a/config.json b/config.json index 2499266e..7bfb54c3 100644 --- a/config.json +++ b/config.json @@ -3,7 +3,6 @@ "ClusterName": "local-cluster", "ClusterNamespace": "local-cluster-ns", "ReportRateMS": 5000, - "RediscoverRateMS": 60000, "RuntimeMode": "development", "DeployedInHub": true } diff --git a/main.go b/main.go index 1d372fbf..cec25088 100644 --- a/main.go +++ b/main.go @@ -3,10 +3,14 @@ package main import ( + "context" "flag" "fmt" "os" + "os/signal" "runtime" + "sync" + "syscall" "time" "github.com/stolostron/search-collector/pkg/config" @@ -25,6 +29,26 @@ const ( LeaseDurationSeconds = 60 ) +// getMainContext returns a context that is canceled on SIGINT or SIGTERM signals. If a second signal is received, +// it exits directly. +// This was inspired by controller-runtime. +func getMainContext() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + + c := make(chan os.Signal, 2) + + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + <-c + cancel() + <-c + os.Exit(1) // Second signal. Exit directly. + }() + + return ctx +} + func main() { // init logs flag.Parse() @@ -73,8 +97,22 @@ func main() { informersInitialized := make(chan interface{}) + mainCtx := getMainContext() + + wg := sync.WaitGroup{} + wg.Add(1) + // Start a routine to keep our informers up to date. - go informer.RunInformers(informersInitialized, upsertTransformer, reconciler) + go func() { + err := informer.RunInformers(mainCtx, informersInitialized, upsertTransformer, reconciler) + if err != nil { + glog.Errorf("Failed to run the informers: %v", err) + + os.Exit(1) + } + + wg.Done() + }() // Wait here until informers have collected the full state of the cluster. // The initial payload must have the complete state to avoid unecessary deletion @@ -83,5 +121,12 @@ func main() { <-informersInitialized glog.Info("Starting the sender.") - sender.StartSendLoop() + wg.Add(1) + + go func() { + sender.StartSendLoop(mainCtx) + wg.Done() + }() + + wg.Wait() } diff --git a/pkg/config/config.go b/pkg/config/config.go index 0bc14a84..0284b8fc 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -26,39 +26,38 @@ import ( // Out of box defaults const ( - COLLECTOR_API_VERSION = "2.11.0" - DEFAULT_AGGREGATOR_URL = "https://localhost:3010" // this will be deprecated in the future - DEFAULT_AGGREGATOR_HOST = "https://localhost" - DEFAULT_AGGREGATOR_PORT = "3010" - DEFAULT_COLLECT_ANNOTATIONS = false - DEFAULT_CLUSTER_NAME = "local-cluster" - DEFAULT_POD_NAMESPACE = "open-cluster-management" - DEFAULT_HEARTBEAT_MS = 300000 // 5 min - DEFAULT_MAX_BACKOFF_MS = 600000 // 10 min - DEFAULT_REDISCOVER_RATE_MS = 120000 // 2 min - DEFAULT_REPORT_RATE_MS = 5000 // 5 seconds - DEFAULT_RETRY_JITTER_MS = 5000 // 5 seconds - DEFAULT_RUNTIME_MODE = "production" + COLLECTOR_API_VERSION = "2.11.0" + DEFAULT_AGGREGATOR_URL = "https://localhost:3010" // this will be deprecated in the future + DEFAULT_AGGREGATOR_HOST = "https://localhost" + DEFAULT_AGGREGATOR_PORT = "3010" + DEFAULT_CLUSTER_NAME = "local-cluster" + DEFAULT_POD_NAMESPACE = "open-cluster-management" + DEFAULT_HEARTBEAT_MS = 300000 // 5 min + DEFAULT_MAX_BACKOFF_MS = 600000 // 10 min + DEFAULT_REDISCOVER_RATE_MS = 120000 // 2 min + DEFAULT_REPORT_RATE_MS = 5000 // 5 seconds + DEFAULT_RETRY_JITTER_MS = 5000 // 5 seconds + DEFAULT_RUNTIME_MODE = "production" ) // Configuration options for the search-collector. type Config struct { - AggregatorConfig *rest.Config // Config object for hub. Used to get TLS credentials. - AggregatorConfigFile string `env:"HUB_CONFIG"` // Config file for hub. Will be mounted in a secret. - AggregatorURL string `env:"AGGREGATOR_URL"` // URL of the Aggregator, includes port but not any path - AggregatorHost string `env:"AGGREGATOR_HOST"` // Host of the Aggregator - AggregatorPort string `env:"AGGREGATOR_PORT"` // Port of the Aggregator - CollectAnnotations bool `env:"COLLECT_ANNOTATIONS"` // Collect all annotations with values <=64 characters - ClusterName string `env:"CLUSTER_NAME"` // The name of of the cluster where this pod is running - PodNamespace string `env:"POD_NAMESPACE"` // The namespace of this pod - DeployedInHub bool `env:"DEPLOYED_IN_HUB"` // Tracks if deployed in the Hub or Managed cluster - HeartbeatMS int `env:"HEARTBEAT_MS"` // Interval(ms) to send empty payload to ensure connection - KubeConfig string `env:"KUBECONFIG"` // Local kubeconfig path - MaxBackoffMS int `env:"MAX_BACKOFF_MS"` // Maximum backoff in ms to wait after error - RediscoverRateMS int `env:"REDISCOVER_RATE_MS"` // Interval(ms) to poll for changes to CRDs - RetryJitterMS int `env:"RETRY_JITTER_MS"` // Random jitter added to backoff wait. - ReportRateMS int `env:"REPORT_RATE_MS"` // Interval(ms) to send changes to the aggregator - RuntimeMode string `env:"RUNTIME_MODE"` // Running mode (development or production) + AggregatorConfig *rest.Config // Config object for hub. Used to get TLS credentials. + AggregatorConfigFile string `env:"HUB_CONFIG"` // Config file for hub. Will be mounted in a secret. + AggregatorURL string `env:"AGGREGATOR_URL"` // URL of the Aggregator, includes port but not any path + AggregatorHost string `env:"AGGREGATOR_HOST"` // Host of the Aggregator + AggregatorPort string `env:"AGGREGATOR_PORT"` // Port of the Aggregator + CollectAnnotations bool `env:"COLLECT_ANNOTATIONS"` // Collect all annotations with values <=64 characters + CollectCRDPrinterColumns bool `env:"COLLECT_CRD_PRINTER_COLUMNS"` // Enable collecting additional printer columns in the CRD + ClusterName string `env:"CLUSTER_NAME"` // The name of of the cluster where this pod is running + PodNamespace string `env:"POD_NAMESPACE"` // The namespace of this pod + DeployedInHub bool `env:"DEPLOYED_IN_HUB"` // Tracks if deployed in the Hub or Managed cluster + HeartbeatMS int `env:"HEARTBEAT_MS"` // Interval(ms) to send empty payload to ensure connection + KubeConfig string `env:"KUBECONFIG"` // Local kubeconfig path + MaxBackoffMS int `env:"MAX_BACKOFF_MS"` // Maximum backoff in ms to wait after error + RetryJitterMS int `env:"RETRY_JITTER_MS"` // Random jitter added to backoff wait. + ReportRateMS int `env:"REPORT_RATE_MS"` // Interval(ms) to send changes to the aggregator + RuntimeMode string `env:"RUNTIME_MODE"` // Running mode (development or production) } var Cfg = Config{} @@ -93,7 +92,7 @@ func InitConfig() { aggHost, aggHostPresent := os.LookupEnv("AGGREGATOR_HOST") aggPort, aggPortPresent := os.LookupEnv("AGGREGATOR_PORT") - //If environment variables are set for aggregator host and port, use those to set the AggregatorURL + // If environment variables are set for aggregator host and port, use those to set the AggregatorURL if aggHostPresent && aggPortPresent && aggHost != "" && aggPort != "" { Cfg.AggregatorURL = net.JoinHostPort(aggHost, aggPort) setDefault(&Cfg.AggregatorURL, "", net.JoinHostPort(DEFAULT_AGGREGATOR_HOST, DEFAULT_AGGREGATOR_PORT)) @@ -103,7 +102,6 @@ func InitConfig() { setDefaultInt(&Cfg.HeartbeatMS, "HEARTBEAT_MS", DEFAULT_HEARTBEAT_MS) setDefaultInt(&Cfg.MaxBackoffMS, "MAX_BACKOFF_MS", DEFAULT_MAX_BACKOFF_MS) - setDefaultInt(&Cfg.RediscoverRateMS, "REDISCOVER_RATE_MS", DEFAULT_REDISCOVER_RATE_MS) setDefaultInt(&Cfg.ReportRateMS, "REPORT_RATE_MS", DEFAULT_REPORT_RATE_MS) setDefaultInt(&Cfg.RetryJitterMS, "RETRY_JITTER_MS", DEFAULT_RETRY_JITTER_MS) @@ -138,6 +136,16 @@ func InitConfig() { } setDefault(&Cfg.AggregatorConfigFile, "HUB_CONFIG", "") + if collectCRDPrinterCols := os.Getenv("COLLECT_CRD_PRINTER_COLUMNS"); collectCRDPrinterCols != "" { + glog.Infof("Using COLLECT_CRD_PRINTER_COLUMNS from environment: %s", collectCRDPrinterCols) + + var err error + Cfg.CollectCRDPrinterColumns, err = strconv.ParseBool(collectCRDPrinterCols) + if err != nil { + glog.Errorf("Error parsing env COLLECT_CRD_PRINTER_COLUMNS, defaulting to false: %v", err) + } + } + if Cfg.DeployedInHub && Cfg.AggregatorConfigFile != "" { glog.Fatal("Config mismatch: DEPLOYED_IN_HUB is true, but HUB_CONFIG is set to connect to another hub") } else if !Cfg.DeployedInHub && Cfg.AggregatorConfigFile == "" { diff --git a/pkg/informer/informer.go b/pkg/informer/informer.go index dec95af9..b33274ff 100644 --- a/pkg/informer/informer.go +++ b/pkg/informer/informer.go @@ -12,7 +12,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -45,10 +44,10 @@ func InformerForResource(res schema.GroupVersionResource) (GenericInformer, erro } // Run runs the informer. -func (inform *GenericInformer) Run(stopper chan struct{}) { +func (inform *GenericInformer) Run(ctx context.Context) { for { select { - case <-stopper: + case <-ctx.Done(): glog.Info("Informer stopped. ", inform.gvr.String()) for key := range inform.resourceIndex { glog.V(5).Infof("Stopping informer %s and removing resource with UID: %s", inform.gvr.Resource, key) @@ -72,7 +71,7 @@ func (inform *GenericInformer) Run(stopper chan struct{}) { err := inform.listAndResync() if err == nil { inform.initialized = true - inform.watch(stopper) + inform.watch(ctx.Done()) } } } @@ -149,8 +148,7 @@ func (inform *GenericInformer) listAndResync() error { } // Watch resources and process events. -func (inform *GenericInformer) watch(stopper chan struct{}) { - +func (inform *GenericInformer) watch(stopper <-chan struct{}) { watch, watchError := inform.client.Resource(inform.gvr).Watch(context.TODO(), metav1.ListOptions{}) if watchError != nil { glog.Warningf("Error watching resources for %s. Error: %s", inform.gvr.String(), watchError) @@ -175,35 +173,35 @@ func (inform *GenericInformer) watch(stopper chan struct{}) { switch event.Type { case "ADDED": glog.V(5).Infof("Received ADDED event. Kind: %s ", inform.gvr.Resource) - o, error := runtime.UnstructuredConverter.ToUnstructured(runtime.DefaultUnstructuredConverter, &event.Object) - if error != nil { - glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on ADDED event. %s", - inform.gvr.Resource, error) + obj, ok := event.Object.(*unstructured.Unstructured) + if !ok { + glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on ADDED event.", + inform.gvr.Resource) + continue } - obj := &unstructured.Unstructured{Object: o} inform.AddFunc(obj) inform.resourceIndex[string(obj.GetUID())] = obj.GetResourceVersion() case "MODIFIED": glog.V(5).Infof("Received MODIFY event. Kind: %s ", inform.gvr.Resource) - o, error := runtime.UnstructuredConverter.ToUnstructured(runtime.DefaultUnstructuredConverter, &event.Object) - if error != nil { - glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on MODIFIED event. %s", - inform.gvr.Resource, error) + obj, ok := event.Object.(*unstructured.Unstructured) + if !ok { + glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on MODIFIED event", + inform.gvr.Resource) + continue } - obj := &unstructured.Unstructured{Object: o} inform.UpdateFunc(nil, obj) inform.resourceIndex[string(obj.GetUID())] = obj.GetResourceVersion() case "DELETED": glog.V(5).Infof("Received DELETED event. Kind: %s ", inform.gvr.Resource) - o, error := runtime.UnstructuredConverter.ToUnstructured(runtime.DefaultUnstructuredConverter, &event.Object) - if error != nil { - glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on DELETED event. %s", - inform.gvr.Resource, error) + obj, ok := event.Object.(*unstructured.Unstructured) + if !ok { + glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on DELETED event", + inform.gvr.Resource) + continue } - obj := &unstructured.Unstructured{Object: o} inform.DeleteFunc(obj) delete(inform.resourceIndex, string(obj.GetUID())) diff --git a/pkg/informer/informer_test.go b/pkg/informer/informer_test.go index 732e6a67..79eb188e 100644 --- a/pkg/informer/informer_test.go +++ b/pkg/informer/informer_test.go @@ -19,7 +19,7 @@ import ( var gvr = schema.GroupVersionResource{Group: "open-cluster-management.io", Version: "v1", Resource: "thekinds"} func fakeDynamicClient() *fake.FakeDynamicClient { - var gvrMapToKind = map[schema.GroupVersionResource]string{} + gvrMapToKind := map[schema.GroupVersionResource]string{} gvrMapToKind[gvr] = "thekindsList" scheme := runtime.NewScheme() scheme.AddKnownTypes(gvr.GroupVersion()) @@ -149,14 +149,14 @@ func Test_StoppedInformer_ValidateDeleteFunc(t *testing.T) { } // start informer - stopper := make(chan struct{}) - go informer.Run(stopper) + ctx, cancel := context.WithCancel(context.Background()) + go informer.Run(ctx) time.Sleep(10 * time.Millisecond) - //exist informer to trigger DeleteFunc - close(stopper) + // exit informer to trigger DeleteFunc + cancel() - //allow test to process + // allow test to process time.Sleep(10 * time.Millisecond) // Verify that the informer.DeleteFunc was called with uid=id-999 and uid=id-100 @@ -173,14 +173,14 @@ func Test_Run(t *testing.T) { informer, addFuncCount, deleteFuncCount, updateFuncCount := initInformer() // Start informer routine - stopper := make(chan struct{}) - go informer.Run(stopper) + ctx, cancel := context.WithCancel(context.Background()) + go informer.Run(ctx) time.Sleep(10 * time.Millisecond) generateSimpleEvent(informer, t) time.Sleep(10 * time.Millisecond) - close(stopper) + cancel() // Verify that informer.AddFunc is called for each of the mocked resources (6 times). if *addFuncCount != 6 { @@ -207,7 +207,10 @@ func Test_Run_retryBackoff(t *testing.T) { informer.AddFunc = func(interface{}) { retryTime = time.Now() } // Execute function - go informer.Run(make(chan struct{})) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go informer.Run(ctx) time.Sleep(2010 * time.Millisecond) // Verify backoff logic waits 2 seconds before retrying. diff --git a/pkg/informer/runInformers.go b/pkg/informer/runInformers.go index ef055501..4d98aa0d 100644 --- a/pkg/informer/runInformers.go +++ b/pkg/informer/runInformers.go @@ -3,8 +3,14 @@ package informer import ( + "context" + "errors" + "fmt" "strings" + "sync" "time" + "unicode" + "unicode/utf8" "github.com/golang/glog" "github.com/stolostron/search-collector/pkg/config" @@ -13,33 +19,442 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" ) +var crdGVR = schema.GroupVersionResource{ + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", +} + +// transformCRD will strip a CRD to be an unstructured object with the following fields. Note that only the stored +// version is in the spec.versions array. +// +// apiVersion: apiextensions.k8s.io/v1 +// kind: CustomResourceDefinition +// metadata: +// name: "" +// generation: 1 +// resourceVersion: 1 +// spec: +// group: "" +// versions: +// - name: "" +// storage: true +// additionalPrinterColumns: [] +func transformCRD(obj interface{}) (interface{}, error) { + typedObj, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil, errors.New("expected an Unstructured object") + } + + transformedObj := unstructured.Unstructured{} + transformedObj.SetAPIVersion(typedObj.GetAPIVersion()) + transformedObj.SetKind(typedObj.GetKind()) + transformedObj.SetName(typedObj.GetName()) + transformedObj.SetGeneration(typedObj.GetGeneration()) + transformedObj.SetResourceVersion(typedObj.GetResourceVersion()) + + group, _, err := unstructured.NestedString(typedObj.Object, "spec", "group") + if err != nil { + return nil, err + } + + err = unstructured.SetNestedField(transformedObj.Object, group, "spec", "group") + if err != nil { + return nil, err + } + + versions, _, err := unstructured.NestedSlice(typedObj.Object, "spec", "versions") + if err != nil { + return nil, err + } + + for _, version := range versions { + versionTyped, ok := version.(map[string]interface{}) + if !ok { + continue + } + + if storage, ok := versionTyped["storage"].(bool); !ok || !storage { + continue + } + + transformedVersions := []interface{}{ + map[string]interface{}{ + "name": versionTyped["name"], + "storage": versionTyped["storage"], + "additionalPrinterColumns": versionTyped["additionalPrinterColumns"], + }, + } + + err = unstructured.SetNestedField(transformedObj.Object, transformedVersions, "spec", "versions") + if err != nil { + return nil, err + } + } + + return &transformedObj, nil +} + +// gvrFromCRD parses the input CRD and returns the GVR of the stored version. Note that this +// relies on Kubernetes' enforcement of the CRD name always being in the . format +// and the plural resource name always being used as the resource name in the GVR. +// See the naming validation here: +// https://github.com/kubernetes/apiextensions-apiserver/blob/v0.31.0/pkg/apis/apiextensions/validation/validation.go#L74 +func gvrFromCRD(crd *unstructured.Unstructured) (*schema.GroupVersionResource, error) { + var versionName string + + versions, _, _ := unstructured.NestedSlice(crd.Object, "spec", "versions") + for _, version := range versions { + versionTyped, ok := version.(map[string]interface{}) + if !ok { + continue + } + + if storage, ok := versionTyped["storage"].(bool); !ok || !storage { + continue + } + + name, ok := versionTyped["name"].(string) + if !ok { + continue + } + + versionName = name + + break + } + + if versionName == "" { + return nil, fmt.Errorf("failed to find the stored version name for the CRD: %s", crd.GetName()) + } + + group, _, _ := unstructured.NestedString(crd.Object, "spec", "group") + if group == "" { + return nil, fmt.Errorf("failed to find the group of the CRD: %s", crd.GetName()) + } + + // CRDs must have a name in the format of . + // https://github.com/kubernetes/apiextensions-apiserver/blob/v0.31.0/pkg/apis/apiextensions/validation/validation.go#L74 + resource := strings.TrimSuffix(crd.GetName(), "."+group) + if resource == "" { + return nil, fmt.Errorf("failed to parse the resource name from the CRD: %s", crd.GetName()) + } + + return &schema.GroupVersionResource{ + Group: group, + Version: versionName, + Resource: resource, + }, nil +} + +// gvrToPrinterColumns is a concurrency safe mapping of the stored version of a CRD represented as a +// schema.GroupVersionResource with values of ExtractProperty slices which represent the additionalPrinterColumns. +type gvrToPrinterColumns struct { + lock sync.RWMutex + mapping map[schema.GroupVersionResource][]tr.ExtractProperty +} + +// toLowerCamelCase converts a UTF-8 string to lower camel case such as enforcementAction. Invalid UTF-8 words are +// ignored. The considered separators are spaces (e.g. "enforcement action") and dashes (e.g. enforcement-action). +func toLowerCamelCase(s string) string { + if s == "" { + return s + } + + // If there are no separators, then preserve the string as is. + if !strings.Contains(s, " ") && !strings.Contains(s, "-") { + return s + } + + s = strings.TrimSpace(s) + s = strings.ToLower(s) + + for _, separator := range []string{" ", "-"} { + parts := strings.Split(s, separator) + + newS := "" + + for i, part := range parts { + if i != 0 { + r, size := utf8.DecodeRuneInString(part) + // RuneError should never be returned because Kubernetes should ensure it's UTF-8 characters, but this is + // just an extra precaution. + if r == utf8.RuneError { + continue + } + + part = string(unicode.ToUpper(r)) + part[size:] + } + + newS += part + } + + s = newS + } + + return s +} + +// set will parse the GVR from the input CRD and set the additional printer columns in the mapping cache. +func (g *gvrToPrinterColumns) set(crd *unstructured.Unstructured) error { + gvr, err := gvrFromCRD(crd) + if err != nil { + return err + } + + var printerColumns []tr.ExtractProperty + + versions, _, _ := unstructured.NestedSlice(crd.Object, "spec", "versions") + for _, version := range versions { + versionTyped, ok := version.(map[string]interface{}) + if !ok { + continue + } + + name, _ := versionTyped["name"].(string) + if name != gvr.Version { + continue + } + + additionalPrinterColumns, ok := versionTyped["additionalPrinterColumns"].([]interface{}) + if !ok { + break + } + + for _, column := range additionalPrinterColumns { + columnTyped, ok := column.(map[string]interface{}) + if !ok { + continue + } + + name, ok := columnTyped["name"].(string) + if !ok { + continue + } + + jsonPath, ok := columnTyped["jsonPath"].(string) + if !ok { + continue + } + + // The additionalPrinterColumns always have a JSON path without curly braces enclosing it, but the + // ExtractProperty.JSONPath field expects them. + jsonPath = fmt.Sprintf("{%s}", jsonPath) + + camelCaseName := toLowerCamelCase(name) + + if camelCaseName == "" { + continue + } + + printerColumns = append(printerColumns, tr.ExtractProperty{Name: camelCaseName, JSONPath: jsonPath}) + } + + break + } + + g.lock.Lock() + g.mapping[*gvr] = printerColumns + g.lock.Unlock() + + return nil +} + +// unset will parse the stored GVR from the input CRD and delete the mapping cache of it. +func (g *gvrToPrinterColumns) unset(crd *unstructured.Unstructured) error { + gvr, err := gvrFromCRD(crd) + if err != nil { + return err + } + + g.lock.Lock() + delete(g.mapping, *gvr) + g.lock.Unlock() + + return nil +} + +// get returns the entries in the additionalPrintedColumns array in the CRD. This uses a cache that must have been +// populated with set. +func (g *gvrToPrinterColumns) get(gvr schema.GroupVersionResource) []tr.ExtractProperty { + g.lock.RLock() + result := g.mapping[gvr] + g.lock.RUnlock() + + return result +} + +// getCRDInformer returns a started and synced +func getCRDInformer( + ctx context.Context, gvrToColumns *gvrToPrinterColumns, syncInformersQueue *workqueue.Type, +) (dynamicinformer.DynamicSharedInformerFactory, error) { + glog.Info("Starting the CRD informer") + + dynSharedInformer := dynamicinformer.NewDynamicSharedInformerFactory(config.GetDynamicClient(), 0) + + crdInformer := dynSharedInformer.ForResource(crdGVR) + crdInformer.Lister() + + err := crdInformer.Informer().SetTransform(transformCRD) + if err != nil { + return nil, err + } + + // The event handlers just add an empty struct to the syncInformersQueue when any CRD is created, updated, or + // deleted. Using the empty struct deduplicates the requests so that when multiple CRD updates occur while + // syncInformers is running, it'll only trigger one additional syncInformers run after the previous completes. + _, err = crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + objTyped, ok := obj.(*unstructured.Unstructured) + if !ok { + return + } + + err := gvrToColumns.set(objTyped) + if err != nil { + glog.Errorf( + "Failed to parse the additionalPrinterColumns from the CRD (%s): %v", objTyped.GetName(), err, + ) + } + + syncInformersQueue.Add(struct{}{}) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldObjTyped, ok := oldObj.(*unstructured.Unstructured) + if !ok { + return + } + + newObjTyped, ok := newObj.(*unstructured.Unstructured) + if !ok { + return + } + + // Only resync if the spec of the CRD changed + if oldObjTyped.GetGeneration() == newObjTyped.GetGeneration() { + return + } + + err := gvrToColumns.set(newObjTyped) + if err != nil { + glog.Errorf( + "Failed to parse the additionalPrinterColumns from the CRD (%s): %v", newObjTyped.GetName(), err, + ) + } + + syncInformersQueue.Add(struct{}{}) + }, + DeleteFunc: func(obj interface{}) { + objTyped, ok := obj.(*unstructured.Unstructured) + if !ok { + unknownStateObj, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + + objTyped, ok = unknownStateObj.Obj.(*unstructured.Unstructured) + if !ok { + return + } + } + + err := gvrToColumns.unset(objTyped) + if err != nil { + glog.Errorf( + "Failed to parse the additionalPrinterColumns from the CRD (%s): %v", objTyped.GetName(), err, + ) + } + + syncInformersQueue.Add(struct{}{}) + }, + }) + if err != nil { + return nil, err + } + + dynSharedInformer.Start(ctx.Done()) + + glog.Info("Waiting for the CRD informer to sync") + + // Waiting for the CRD informer to sync means that the event handlers have all run for the results from the initial + // listing of all CRDs. This allows deduplicating the list requests to a single item in the queue. + if !cache.WaitForCacheSync(ctx.Done(), crdInformer.Informer().HasSynced) { + return nil, errors.New("timed out waiting for the CRD informer to sync") + } + + // A bit of a hack to drain the queue from the informer performing the list query. We only really care about + // events after the initial sync. + if syncInformersQueue.Len() != 0 { + item, _ := syncInformersQueue.Get() + syncInformersQueue.Done(item) + } + + glog.Info("The CRD informer has started") + + return dynSharedInformer, nil +} + // Start and manages informers for resources in the cluster. -func RunInformers(initialized chan interface{}, upsertTransformer tr.Transformer, reconciler *rec.Reconciler) { +func RunInformers( + ctx context.Context, + initialized chan interface{}, + upsertTransformer tr.Transformer, + reconciler *rec.Reconciler, +) (err error) { + var wasInitialized bool + + gvrToColumns := gvrToPrinterColumns{mapping: map[schema.GroupVersionResource][]tr.ExtractProperty{}} + + defer func() { + // Always close the initialized channel if it was not already closed so the caller does not wait forever. + if !wasInitialized { + close(initialized) + } + }() + + syncInformersQueue := workqueue.NewWithConfig(workqueue.QueueConfig{}) + defer syncInformersQueue.ShutDown() + + dynSharedInformer, err := getCRDInformer(ctx, &gvrToColumns, syncInformersQueue) + if err != nil { + return err + } + + // Get kubernetes client for discovering resource types + discoveryClient := config.GetDiscoveryClient() + + // We keep each of the informer's stopper channel in a map, so we can stop them if the resource is no longer valid. + stoppers := make(map[schema.GroupVersionResource]context.CancelFunc) // These functions return handler functions, which are then used in creation of the informers. - createInformAddHandler := func(resourceName string) func(interface{}) { + createInformAddHandler := func(gvr schema.GroupVersionResource) func(interface{}) { return func(obj interface{}) { resource := obj.(*unstructured.Unstructured) upsert := tr.Event{ - Time: time.Now().Unix(), - Operation: tr.Create, - Resource: resource, - ResourceString: resourceName, + Time: time.Now().Unix(), + Operation: tr.Create, + Resource: resource, + ResourceString: gvr.Resource, + AdditionalPrinterColumns: gvrToColumns.get(gvr), } upsertTransformer.Input <- &upsert // Send resource into the transformer input channel } } - createInformUpdateHandler := func(resourceName string) func(interface{}, interface{}) { + createInformUpdateHandler := func(gvr schema.GroupVersionResource) func(interface{}, interface{}) { return func(oldObj, newObj interface{}) { resource := newObj.(*unstructured.Unstructured) upsert := tr.Event{ - Time: time.Now().Unix(), - Operation: tr.Update, - Resource: resource, - ResourceString: resourceName, + Time: time.Now().Unix(), + Operation: tr.Update, + Resource: resource, + ResourceString: gvr.Resource, + AdditionalPrinterColumns: gvrToColumns.get(gvr), } upsertTransformer.Input <- &upsert // Send resource into the transformer input channel } @@ -58,30 +473,65 @@ func RunInformers(initialized chan interface{}, upsertTransformer tr.Transformer reconciler.Input <- ne } - // Get kubernetes client for discovering resource types - discoveryClient := config.GetDiscoveryClient() - - // We keep each of the informer's stopper channel in a map, so we can stop them if the resource is no longer valid. - stoppers := make(map[schema.GroupVersionResource]chan struct{}) - // Initialize the informers - syncInformers(*discoveryClient, stoppers, createInformAddHandler, createInformUpdateHandler, informDeleteHandler) + syncInformers(ctx, *discoveryClient, stoppers, createInformAddHandler, createInformUpdateHandler, informDeleteHandler) + // Close the initialized channel so that we can start the sender. + wasInitialized = true close(initialized) - // Continue polling to keep the informers synchronized when CRDs are added or deleted in the cluster. + + lastSynced := time.Now() + minBetweenSyncs := 5 * time.Second + + // Keep the informers synchronized when CRDs are added or deleted in the cluster. for { - time.Sleep(time.Duration(config.Cfg.RediscoverRateMS) * time.Millisecond) - syncInformers(*discoveryClient, stoppers, createInformAddHandler, createInformUpdateHandler, informDeleteHandler) + select { + case <-ctx.Done(): + // The parent context canceled, so all the informers's child contexts will also be canceled, so no + // explicit clean up is needed. Ideally, this would wait for all the informers to have fully stopped before + // returning, but that state is not available here. + glog.Info("Waiting for the CRD informer to shutdown") + + // The informer is already shutting down since the parent context was canceled, but this call to Shutdown + // blocks until all of its goroutines have stopped. + dynSharedInformer.Shutdown() + + return + default: + + } + + syncRequest, shutdown := syncInformersQueue.Get() + if shutdown { + return + } + + // Add up to a 5 second delay to account for things such as a new operator adding multiple CRDs. + sinceLastSync := time.Since(lastSynced) + + if sinceLastSync < minBetweenSyncs { + time.Sleep(minBetweenSyncs - sinceLastSync) + } + + syncInformers( + ctx, *discoveryClient, stoppers, createInformAddHandler, createInformUpdateHandler, informDeleteHandler, + ) + + lastSynced = time.Now() + + syncInformersQueue.Done(syncRequest) } } // Start or stop informers to match the resources (CRDs) available in the cluster. -func syncInformers(client discovery.DiscoveryClient, - stoppers map[schema.GroupVersionResource]chan struct{}, - createInformerAddHandler func(string) func(interface{}), - createInformerUpdateHandler func(string) func(interface{}, interface{}), - informerDeleteHandler func(obj interface{})) { - +func syncInformers( + ctx context.Context, + client discovery.DiscoveryClient, + stoppers map[schema.GroupVersionResource]context.CancelFunc, + createInformerAddHandler func(schema.GroupVersionResource) func(interface{}), + createInformerUpdateHandler func(schema.GroupVersionResource) func(interface{}, interface{}), + informerDeleteHandler func(obj interface{}), +) { glog.V(2).Infof("Synchronizing informers. Informers running: %d", len(stoppers)) gvrList, err := SupportedResources(client) @@ -102,7 +552,7 @@ func syncInformers(client discovery.DiscoveryClient, continue } else { // if it's in the old and NOT in the new, stop the informer glog.V(2).Infof("Stopping informer: %s", gvr.String()) - close(stopper) + stopper() delete(stoppers, gvr) } } @@ -114,13 +564,13 @@ func syncInformers(client discovery.DiscoveryClient, informer, _ := InformerForResource(gvr) // Set up handler to pass this informer's resources into transformer - informer.AddFunc = createInformerAddHandler(gvr.Resource) - informer.UpdateFunc = createInformerUpdateHandler(gvr.Resource) + informer.AddFunc = createInformerAddHandler(gvr) + informer.UpdateFunc = createInformerUpdateHandler(gvr) informer.DeleteFunc = informerDeleteHandler - stopper := make(chan struct{}) - stoppers[gvr] = stopper - go informer.Run(stopper) + informerCtx, informerCancel := context.WithCancel(ctx) + stoppers[gvr] = informerCancel + go informer.Run(informerCtx) // This wait serializes the informer initialization. It is needed to avoid a // spike in memory when the collector starts. informer.WaitUntilInitialized(time.Duration(10) * time.Second) // Times out after 10 seconds. diff --git a/pkg/informer/runInformers_test.go b/pkg/informer/runInformers_test.go index 48d912fa..5dfe5dec 100644 --- a/pkg/informer/runInformers_test.go +++ b/pkg/informer/runInformers_test.go @@ -3,22 +3,26 @@ package informer import ( + "context" "encoding/json" "net/http" "net/http/httptest" "testing" + tr "github.com/stolostron/search-collector/pkg/transforms" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" restclient "k8s.io/client-go/rest" ) -var mockAddFn = func(s string) func(interface{}) { +var mockAddFn = func(gvr schema.GroupVersionResource) func(interface{}) { return func(o interface{}) {} } -var mockUpdateFn = func(s string) func(interface{}, interface{}) { + +var mockUpdateFn = func(gvr schema.GroupVersionResource) func(interface{}, interface{}) { return func(old interface{}, new interface{}) {} } @@ -64,13 +68,15 @@ func fakeDiscoveryClient() (*httptest.Server, discovery.DiscoveryClient) { } func Test_syncInformers(t *testing.T) { - - mockStoppers := make(map[schema.GroupVersionResource]chan struct{}) + mockStoppers := make(map[schema.GroupVersionResource]context.CancelFunc) fakeServer, fakeClient := fakeDiscoveryClient() defer fakeServer.Close() - syncInformers(fakeClient, mockStoppers, mockAddFn, mockUpdateFn, mockDeleteHandler) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + syncInformers(ctx, fakeClient, mockStoppers, mockAddFn, mockUpdateFn, mockDeleteHandler) assert.Equal(t, 3, len(mockStoppers)) @@ -81,14 +87,14 @@ func Test_syncInformers(t *testing.T) { // Validate that informer is stopped when resource no longer exists. func Test_syncInformers_removeInformers(t *testing.T) { - mockStoppers := make(map[schema.GroupVersionResource]chan struct{}) - stopper := make(chan struct{}) - mockStoppers[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "notExist"}] = stopper + mockStoppers := make(map[schema.GroupVersionResource]context.CancelFunc) + ctx, cancel := context.WithCancel(context.Background()) + mockStoppers[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "notExist"}] = cancel fakeServer, fakeClient := fakeDiscoveryClient() defer fakeServer.Close() - syncInformers(fakeClient, mockStoppers, mockAddFn, mockUpdateFn, mockDeleteHandler) + syncInformers(ctx, fakeClient, mockStoppers, mockAddFn, mockUpdateFn, mockDeleteHandler) assert.Equal(t, 3, len(mockStoppers)) @@ -97,3 +103,163 @@ func Test_syncInformers_removeInformers(t *testing.T) { assert.False(t, exists) assert.Nil(t, informStopper) } + +func getSimpleTransformedCRD() unstructured.Unstructured { + return unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "apiextensions.k8s.io/v1", + "kind": "CustomResourceDefinition", + "metadata": map[string]interface{}{ + "name": "fakes.policy.open-cluster-management.io", + "generation": int64(1), + "resourceVersion": "1", + }, + "spec": map[string]interface{}{ + "group": "policy.open-cluster-management.io", + "versions": []interface{}{ + map[string]interface{}{ + "additionalPrinterColumns": []interface{}{ + map[string]interface{}{ + "jsonPath": ".status.compliant", + "name": "Compliance state", + "type": "string", + }, + }, + "name": "v1", + "storage": true, + }, + }, + }, + }, + } +} + +func Test_transformCRD(t *testing.T) { + crd := unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "apiextensions.k8s.io/v1", + "kind": "CustomResourceDefinition", + "metadata": map[string]interface{}{ + "name": "fakes.policy.open-cluster-management.io", + "generation": int64(1), + "resourceVersion": "1", + "labels": map[string]interface{}{ + "hello": "world", + }, + }, + "spec": map[string]interface{}{ + "group": "policy.open-cluster-management.io", + "versions": []interface{}{ + map[string]interface{}{ + "name": "v1beta1", + "storage": false, + "served": true, + }, + map[string]interface{}{ + "additionalPrinterColumns": []interface{}{ + map[string]interface{}{ + "jsonPath": ".status.compliant", + "name": "Compliance state", + "type": "string", + }, + }, + "name": "v1", + "storage": true, + "served": true, + }, + }, + }, + }, + } + + expectedCRD := getSimpleTransformedCRD() + + transformedCRD, err := transformCRD(&crd) + assert.Nil(t, err) + assert.Equal(t, expectedCRD.Object, transformedCRD.(*unstructured.Unstructured).Object) +} + +func Test_gvrFromCRD(t *testing.T) { + crd := getSimpleTransformedCRD() + + gvr, err := gvrFromCRD(&crd) + assert.Nil(t, err) + + expected := schema.GroupVersionResource{ + Group: "policy.open-cluster-management.io", + Version: "v1", + Resource: "fakes", + } + + assert.Equal(t, expected, *gvr) +} + +func Test_toLowerCamelCase(t *testing.T) { + assert.Equal(t, "enforcementAction", toLowerCamelCase("enforcement action")) + assert.Equal(t, "enforcementAction", toLowerCamelCase("enforcement-action")) + assert.Equal(t, "enforcementAction", toLowerCamelCase("enforcementAction")) +} + +func Test_gvrToPrinterColumns(t *testing.T) { + gvrToColumns := gvrToPrinterColumns{mapping: map[schema.GroupVersionResource][]tr.ExtractProperty{}} + + gvr := schema.GroupVersionResource{ + Group: "policy.open-cluster-management.io", + Version: "v1", + Resource: "fakes", + } + + // At first, the mapping should be empty. + assert.Nil(t, gvrToColumns.get(gvr)) + + crd := unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "apiextensions.k8s.io/v1", + "kind": "CustomResourceDefinition", + "metadata": map[string]interface{}{ + "name": "fakes.policy.open-cluster-management.io", + "generation": int64(1), + "resourceVersion": "1", + "labels": map[string]interface{}{ + "hello": "world", + }, + }, + "spec": map[string]interface{}{ + "group": "policy.open-cluster-management.io", + "versions": []interface{}{ + map[string]interface{}{ + "name": "v1beta1", + "storage": false, + "served": true, + }, + map[string]interface{}{ + "additionalPrinterColumns": []interface{}{ + map[string]interface{}{ + "jsonPath": ".status.compliant", + "name": "Compliance state", + "type": "string", + }, + }, + "name": "v1", + "storage": true, + "served": true, + }, + }, + }, + }, + } + + // Cache the mapping for the CRD + assert.Nil(t, gvrToColumns.set(&crd)) + + expected := []tr.ExtractProperty{ + {Name: "complianceState", JSONPath: "{.status.compliant}"}, + } + + // Verify the correct GVR was parsed and the mapping was stored + assert.Equal(t, expected, gvrToColumns.get(gvr)) + + // Verify the mapping can be unset + assert.Nil(t, gvrToColumns.unset(&crd)) + assert.Nil(t, gvrToColumns.get(gvr)) +} diff --git a/pkg/send/sender.go b/pkg/send/sender.go index 181e36de..72164e8d 100644 --- a/pkg/send/sender.go +++ b/pkg/send/sender.go @@ -12,6 +12,7 @@ package send import ( "bytes" + "context" "crypto/rand" "encoding/json" "errors" @@ -296,12 +297,17 @@ func (s *Sender) Sync() error { // Starts the send loop to send data on an interval. // In case of error it backoffs and retries. -func (s *Sender) StartSendLoop() { - +func (s *Sender) StartSendLoop(ctx context.Context) { // Used for exponential backoff, increased each interval. Has to be a float64 since I use it with math.Exp2() backoffFactor := 1 // Note: must be 1. Using 0 will send the next payload immediately. for { + select { + case <-ctx.Done(): + return + default: + } + glog.V(3).Info("Beginning Send Cycle") err := s.Sync() if err != nil { diff --git a/pkg/transforms/genericResourceConfig.go b/pkg/transforms/genericResourceConfig.go index 4124f2b9..e14045db 100644 --- a/pkg/transforms/genericResourceConfig.go +++ b/pkg/transforms/genericResourceConfig.go @@ -2,8 +2,8 @@ package transforms // Declares a property to extract from a resource using jsonpath. type ExtractProperty struct { - name string // `json:"name,omitempty"` - jsonpath string // `json:"jsonpath,omitempty"` + Name string // `json:"name,omitempty"` + JSONPath string // `json:"jsonpath,omitempty"` } // Declares the properties to extract from a given resource. @@ -11,36 +11,53 @@ type ResourceConfig struct { properties []ExtractProperty // `json:"properties,omitempty"` } +var ( + defaultTransformIgnoredFields = map[string]bool{ + // Skip age since this likely duplicates "created" that is set by genericProperties. + "age": true, + } + knownStringArrays = map[string]bool{ + "accessMode": true, + "category": true, + "container": true, + "image": true, + "port": true, + "role": true, + "rules": true, + "subject": true, + } +) + // Declares properties to extract from the resource by default. var defaultTransformConfig = map[string]ResourceConfig{ - "ClusterServiceVersion.operators.coreos.com": ResourceConfig{ + "ClusterServiceVersion.operators.coreos.com": { properties: []ExtractProperty{ - ExtractProperty{name: "version", jsonpath: "{.spec.version}"}, - ExtractProperty{name: "display", jsonpath: "{.spec.displayName}"}, - ExtractProperty{name: "phase", jsonpath: "{.status.phase}"}, + {Name: "version", JSONPath: "{.spec.version}"}, + {Name: "display", JSONPath: "{.spec.displayName}"}, + {Name: "phase", JSONPath: "{.status.phase}"}, }, }, - "Subscription.operators.coreos.com": ResourceConfig{ + "Subscription.operators.coreos.com": { properties: []ExtractProperty{ - ExtractProperty{name: "source", jsonpath: "{.spec.source}"}, - ExtractProperty{name: "package", jsonpath: "{.spec.name}"}, - ExtractProperty{name: "channel", jsonpath: "{.spec.channel}"}, - ExtractProperty{name: "installplan", jsonpath: "{.status.installedCSV}"}, - ExtractProperty{name: "phase", jsonpath: "{.status.state}"}, + {Name: "source", JSONPath: "{.spec.source}"}, + {Name: "package", JSONPath: "{.spec.name}"}, + {Name: "channel", JSONPath: "{.spec.channel}"}, + {Name: "installplan", JSONPath: "{.status.installedCSV}"}, + {Name: "phase", JSONPath: "{.status.state}"}, }, }, - "ClusterOperator.config.openshift.io": ResourceConfig{ + "ClusterOperator.config.openshift.io": { properties: []ExtractProperty{ - ExtractProperty{name: "version", jsonpath: `{.status.versions[?(@.name=="operator")].version}`}, - ExtractProperty{name: "available", jsonpath: `{.status.conditions[?(@.type=="Available")].status}`}, - ExtractProperty{name: "progressing", jsonpath: `{.status.conditions[?(@.type=="Progressing")].status}`}, - ExtractProperty{name: "degraded", jsonpath: `{.status.conditions[?(@.type=="Degraded")].status}`}, + {Name: "version", JSONPath: `{.status.versions[?(@.name=="operator")].version}`}, + {Name: "available", JSONPath: `{.status.conditions[?(@.type=="Available")].status}`}, + {Name: "progressing", JSONPath: `{.status.conditions[?(@.type=="Progressing")].status}`}, + {Name: "degraded", JSONPath: `{.status.conditions[?(@.type=="Degraded")].status}`}, }, }, - "VirtualMachine.kubevirt.io": ResourceConfig{ + "VirtualMachine.kubevirt.io": { properties: []ExtractProperty{ - ExtractProperty{name: "status", jsonpath: `{.status.printableStatus}`}, - ExtractProperty{name: "ready", jsonpath: `{.status.conditions[?(@.type=='Ready')].status}`}, + {Name: "status", JSONPath: `{.status.printableStatus}`}, + {Name: "ready", JSONPath: `{.status.conditions[?(@.type=='Ready')].status}`}, }, }, } diff --git a/pkg/transforms/genericresource.go b/pkg/transforms/genericresource.go index dedd23a7..429ae8cb 100644 --- a/pkg/transforms/genericresource.go +++ b/pkg/transforms/genericresource.go @@ -4,7 +4,6 @@ package transforms import ( - "fmt" "strings" "time" @@ -22,7 +21,7 @@ type GenericResource struct { // Builds a GenericResource node. // Extract default properties from unstructured resource. // Supports extracting additional properties defined by the transform config. -func GenericResourceBuilder(r *unstructured.Unstructured) *GenericResource { +func GenericResourceBuilder(r *unstructured.Unstructured, additionalColumns ...ExtractProperty) *GenericResource { n := Node{ UID: prefixedUID(r.GetUID()), Properties: genericProperties(r), @@ -33,33 +32,65 @@ func GenericResourceBuilder(r *unstructured.Unstructured) *GenericResource { group := r.GroupVersionKind().Group kind := r.GetKind() transformConfig, found := getTransformConfig(group, kind) - if found { - for _, prop := range transformConfig.properties { - jp := jsonpath.New(prop.name) - parseErr := jp.Parse(prop.jsonpath) - if parseErr != nil { - klog.Errorf("Error parsing jsonpath [%s] for [%s.%s] prop: [%s]. Reason: %v", - prop.jsonpath, kind, group, prop.name, parseErr) - continue - } - result, err := jp.FindResults(r.Object) - if err != nil { - // This error isn't always indicative of a problem, for example, when the object is created, it - // won't have a status yet, so the jsonpath returns an error until controller adds the status. - klog.V(1).Infof("Unable to extract prop [%s] from [%s.%s] Name: [%s]. Reason: %v", - prop.name, kind, group, r.GetName(), err) - continue - } - if len(result) > 0 && len(result[0]) > 0 { - n.Properties[prop.name] = fmt.Sprintf("%s", result[0][0]) - } else { - klog.Errorf("Unexpected error extracting [%s] from [%s.%s] Name: [%s]. Result object is empty.", - prop.name, kind, group, r.GetName()) - continue + + // Currently, only pull in the additionalPrinterColumns listed in the CRD if it's a Gatekeeper + // constraint or globally enabled. + if !found && (config.Cfg.CollectCRDPrinterColumns || group == "constraints.gatekeeper.sh") { + transformConfig = ResourceConfig{properties: additionalColumns} + } + + for _, prop := range transformConfig.properties { + // Skip properties that are already set. This could happen if additionalPrinterColumns + // is overriding a generic property. + if _, ok := n.Properties[prop.Name]; ok { + continue + } + + // Skip additionalPrinterColumns that should be ignored. + if !found && defaultTransformIgnoredFields[prop.Name] { + continue + } + + jp := jsonpath.New(prop.Name) + parseErr := jp.Parse(prop.JSONPath) + if parseErr != nil { + klog.Errorf("Error parsing jsonpath [%s] for [%s.%s] prop: [%s]. Reason: %v", + prop.JSONPath, kind, group, prop.Name, parseErr) + continue + } + + result, err := jp.FindResults(r.Object) + if err != nil { + // This error isn't always indicative of a problem, for example, when the object is created, it + // won't have a status yet, so the jsonpath returns an error until controller adds the status. + klog.V(1).Infof("Unable to extract prop [%s] from [%s.%s] Name: [%s]. Reason: %v", + prop.Name, kind, group, r.GetName(), err) + continue + } + + if len(result) > 0 && len(result[0]) > 0 { + val := result[0][0].Interface() + + if knownStringArrays[prop.Name] { + if _, ok := val.([]string); !ok { + klog.V(1).Infof("Ignoring the property [%s] from [%s.%s] Name: [%s]. Reason: not a string slice", + prop.Name, kind, group, r.GetName()) + continue + } } + + n.Properties[prop.Name] = val + } else { + klog.Errorf("Unexpected error extracting [%s] from [%s.%s] Name: [%s]. Result object is empty.", + prop.Name, kind, group, r.GetName()) + continue } + } + + if found { klog.V(5).Infof("Built [%s.%s] using transform config.\nNode: %+v\n", kind, group, n) } + return &GenericResource{node: n} } diff --git a/pkg/transforms/policy.go b/pkg/transforms/policy.go index 5ad15f4b..1a857b4d 100644 --- a/pkg/transforms/policy.go +++ b/pkg/transforms/policy.go @@ -45,21 +45,23 @@ func PolicyResourceBuilder(p *policy.Policy) *PolicyResource { return &PolicyResource{node: node} } -// For cert, config, operator policies. -// This function returns `annotations`, `_isExternal` for `source`, -// and `severity`, `compliant`, and `remediationAction`. -func getPolicyCommonProperties(c *unstructured.Unstructured, node Node) Node { - node.Properties["_isExternal"] = false - +func getIsPolicyExternal(c *unstructured.Unstructured) bool { for _, m := range c.GetManagedFields() { if m.Manager == "multicluster-operators-subscription" || strings.Contains(m.Manager, "argocd") { - node.Properties["_isExternal"] = true - - break + return true } } + return false +} + +// For cert, config, operator policies. +// This function returns `annotations`, `_isExternal` for `source`, +// and `severity`, `compliant`, and `remediationAction`. +func getPolicyCommonProperties(c *unstructured.Unstructured, node Node) Node { + node.Properties["_isExternal"] = getIsPolicyExternal(c) + typeMeta := metav1.TypeMeta{ Kind: c.GetKind(), APIVersion: c.GetAPIVersion(), diff --git a/pkg/transforms/transformer.go b/pkg/transforms/transformer.go index cc569d7c..dcec23ed 100644 --- a/pkg/transforms/transformer.go +++ b/pkg/transforms/transformer.go @@ -49,10 +49,11 @@ const ( // This type is used for add and update events. type Event struct { - Time int64 - Operation Operation - Resource *unstructured.Unstructured - ResourceString string // This is a plural identifier of the kind. + Time int64 + Operation Operation + Resource *unstructured.Unstructured + ResourceString string // This is a plural identifier of the kind. + AdditionalPrinterColumns []ExtractProperty // The entries from the additionalPrinterColumns array in the CRD. } // A generic node type that is passed to the aggregator to store in the database. @@ -412,7 +413,14 @@ func TransformRoutine(input chan *Event, output chan NodeEvent) { trans = PolicyReportResourceBuilder(&typedResource) default: - trans = GenericResourceBuilder(event.Resource) + generic := GenericResourceBuilder(event.Resource, event.AdditionalPrinterColumns...) + + // Gatekeeper constraint kinds are user defined, so key on just the API group to add an additional property. + if apiGroup == "constraints.gatekeeper.sh" { + generic.node.Properties["_isExternal"] = getIsPolicyExternal(event.Resource) + } + + trans = generic } output <- NewNodeEvent(event, trans, event.ResourceString) diff --git a/pkg/transforms/transformer_test.go b/pkg/transforms/transformer_test.go index 54ce4d8e..58bc6c59 100644 --- a/pkg/transforms/transformer_test.go +++ b/pkg/transforms/transformer_test.go @@ -50,7 +50,35 @@ func TestTransformRoutine(t *testing.T) { addonNode := KlusterletAddonConfigResourceBuilder(&addonTyped).BuildNode() addonNode.ResourceString = "klusterletaddonconfigs" - var tests = []struct { + unstructGatekeeperConstraint := unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "constraints.gatekeeper.sh/v1beta1", + "kind": "K8sRequiredLabels", + "metadata": map[string]interface{}{ + "name": "ns-must-have-gk", + "uid": "783472a78", + }, + "spec": map[string]interface{}{ + "enforcementAction": "dryrun", + "other": "value", + }, + "status": map[string]interface{}{ + "auditTimestamp": "2024-08-26T19:12:02Z", + "totalViolations": 84, + }, + }, + } + gatekeeperPrinterColumns := []ExtractProperty{ + {Name: "enforcementAction", JSONPath: "{.spec.enforcementAction}"}, + {Name: "totalViolations", JSONPath: "{.status.totalViolations}"}, + } + gatekeeperConstraintNode := GenericResourceBuilder( + &unstructGatekeeperConstraint, gatekeeperPrinterColumns..., + ).BuildNode() + gatekeeperConstraintNode.ResourceString = "k8srequiredlabels" + gatekeeperConstraintNode.Properties["_isExternal"] = false + + tests := []struct { name string in *Event expected NodeEvent @@ -111,6 +139,21 @@ func TestTransformRoutine(t *testing.T) { Operation: Create, }, }, + { + "Gatekeeper constraint create", + &Event{ + Time: ts, + Operation: Create, + Resource: &unstructGatekeeperConstraint, + ResourceString: "k8srequiredlabels", + AdditionalPrinterColumns: gatekeeperPrinterColumns, + }, + NodeEvent{ + Node: gatekeeperConstraintNode, + Time: ts, + Operation: Create, + }, + }, } go TransformRoutine(input, output)