From c8c17db420fefe04998f3a4bba0f429330ae04d1 Mon Sep 17 00:00:00 2001 From: Meng Yan Date: Sun, 28 Apr 2024 11:30:01 +0800 Subject: [PATCH] :bug: Wait until the resources is ready to start controller-manager (#890) * Wait until the resources is ready to start controller-manager Signed-off-by: myan * fmt Signed-off-by: Meng Yan * add crd controller Signed-off-by: Meng Yan * add global hub controller Signed-off-by: Meng Yan * fix the backup Signed-off-by: Meng Yan * fix backup error Signed-off-by: Meng Yan * skip the middleware test Signed-off-by: Meng Yan * dynamically cache Signed-off-by: Meng Yan * remove dynamic cache controller Signed-off-by: Meng Yan * fix hubofhub test Signed-off-by: Meng Yan * f Signed-off-by: Meng Yan * remove unnessary code Signed-off-by: Meng Yan --------- Signed-off-by: myan Signed-off-by: Meng Yan --- operator/main.go | 290 +----------------- operator/main_test.go | 10 +- operator/pkg/config/cache_config.go | 120 ++++++++ operator/pkg/config/controller_config.go | 97 ++++++ .../config/multiclusterglobalhub_config.go | 42 --- operator/pkg/config/operator_config.go | 10 + operator/pkg/config/scheme_config.go | 60 ++++ operator/pkg/config/transport_config.go | 35 +++ .../pkg/controllers/addon/addon_controller.go | 33 +- .../addon/addon_controller_manifests.go | 47 +-- .../addon/addon_controller_manifests_test.go | 8 +- .../pkg/controllers/addon/addon_installer.go | 72 +++-- .../controllers/addon/addon_installer_test.go | 27 +- .../addon/addon_integration_test.go | 2 + .../pkg/controllers/addon/addon_suite_test.go | 111 +------ .../pkg/controllers/backup/backup_start.go | 30 +- .../controllers/backup/integration_test.go | 1 - .../pkg/controllers/crd/crd_controller.go | 195 ++++++++++++ .../controllers/crd/crd_controller_test.go | 170 ++++++++++ .../hubofhubs/globalhub_condition.go | 4 +- .../hubofhubs/globalhub_controller.go | 57 ++-- .../hubofhubs/globalhub_kafka_controller.go | 73 +---- .../hubofhubs/globalhub_manager.go | 15 +- .../hubofhubs/globalhub_middleware.go | 22 +- .../controllers/hubofhubs/integration_test.go | 1 - .../pkg/controllers/hubofhubs/suite_test.go | 11 +- 26 files changed, 890 insertions(+), 653 deletions(-) create mode 100644 operator/pkg/config/cache_config.go create mode 100644 operator/pkg/config/controller_config.go create mode 100644 operator/pkg/config/operator_config.go create mode 100644 operator/pkg/config/scheme_config.go create mode 100644 operator/pkg/config/transport_config.go create mode 100644 operator/pkg/controllers/crd/crd_controller.go create mode 100644 operator/pkg/controllers/crd/crd_controller_test.go diff --git a/operator/main.go b/operator/main.go index 87826f6a3..242e0d683 100644 --- a/operator/main.go +++ b/operator/main.go @@ -20,113 +20,23 @@ import ( "context" "flag" "os" - "strconv" "time" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) - // to ensure that exec-entrypoint and run can make use of them. - kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2" - routev1 "github.com/openshift/api/route/v1" - subv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" - operatorsv1 "github.com/operator-framework/operator-lifecycle-manager/pkg/package-server/apis/operators/v1" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) - // to ensure that exec-entrypoint and run can make use of them. - promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "github.com/spf13/pflag" - agentv1 "github.com/stolostron/klusterlet-addon-controller/pkg/apis/agent/v1" - mchv1 "github.com/stolostron/multiclusterhub-operator/api/v1" - admissionregistrationv1 "k8s.io/api/admissionregistration/v1" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" - addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" - clusterv1 "open-cluster-management.io/api/cluster/v1" - clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1" - clusterv1beta2 "open-cluster-management.io/api/cluster/v1beta2" - workv1 "open-cluster-management.io/api/work/v1" - policyv1 "open-cluster-management.io/governance-policy-propagator/api/v1" - chnv1 "open-cluster-management.io/multicloud-operators-channel/pkg/apis/apps/v1" - placementrulesv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1" - appsubv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/v1" - appsubV1alpha1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/v1alpha1" - applicationv1beta1 "sigs.k8s.io/application/api/v1beta1" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" - operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" - hubofhubsaddon "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon" - backupcontrollers "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/backup" - hubofhubscontrollers "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/hubofhubs" - "github.com/stolostron/multicluster-global-hub/pkg/constants" - commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" + "github.com/stolostron/multicluster-global-hub/operator/pkg/config" + "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/crd" "github.com/stolostron/multicluster-global-hub/pkg/utils" ) -var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") - - labelSelector = labels.SelectorFromSet( - labels.Set{ - constants.GlobalHubOwnerLabelKey: constants.GHOperatorOwnerLabelVal, - }, - ) - namespacePath = "metadata.namespace" -) - -func init() { - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(routev1.AddToScheme(scheme)) - utilruntime.Must(operatorsv1.AddToScheme(scheme)) - utilruntime.Must(clusterv1.AddToScheme(scheme)) - utilruntime.Must(clusterv1beta1.AddToScheme(scheme)) - utilruntime.Must(clusterv1beta2.AddToScheme(scheme)) - utilruntime.Must(workv1.AddToScheme(scheme)) - utilruntime.Must(addonv1alpha1.AddToScheme(scheme)) - utilruntime.Must(globalhubv1alpha4.AddToScheme(scheme)) - utilruntime.Must(appsubv1.SchemeBuilder.AddToScheme(scheme)) - utilruntime.Must(appsubV1alpha1.AddToScheme(scheme)) - utilruntime.Must(subv1alpha1.AddToScheme(scheme)) - utilruntime.Must(chnv1.AddToScheme(scheme)) - utilruntime.Must(placementrulesv1.AddToScheme(scheme)) - utilruntime.Must(policyv1.AddToScheme(scheme)) - utilruntime.Must(applicationv1beta1.AddToScheme(scheme)) - utilruntime.Must(admissionregistrationv1.AddToScheme(scheme)) - utilruntime.Must(mchv1.AddToScheme(scheme)) - utilruntime.Must(agentv1.SchemeBuilder.AddToScheme(scheme)) - utilruntime.Must(promv1.AddToScheme(scheme)) - utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) - - // add Kafka scheme - utilruntime.Must(kafkav1beta2.AddToScheme(scheme)) - - // +kubebuilder:scaffold:scheme -} - -type operatorConfig struct { - MetricsAddress string - ProbeAddress string - PodNamespace string - LeaderElection bool - GlobalResourceEnabled bool - LogLevel string -} +var setupLog = ctrl.Log.WithName("setup") func main() { os.Exit(doMain(ctrl.SetupSignalHandler(), ctrl.GetConfigOrDie())) @@ -142,84 +52,21 @@ func doMain(ctx context.Context, cfg *rest.Config) int { return 1 } - controllerConfigMap, err := kubeClient.CoreV1().ConfigMaps(utils.GetDefaultNamespace()).Get( - ctx, operatorconstants.ControllerConfig, metav1.GetOptions{}) - if err != nil { - if !errors.IsNotFound(err) { - setupLog.Error(err, "failed to get controller config") - return 1 - } - controllerConfigMap = nil - } - - electionConfig, err := getElectionConfig(controllerConfigMap) + err = config.LoadControllerConfig(ctx, kubeClient) if err != nil { - setupLog.Error(err, "failed to get election config") + setupLog.Error(err, "failed to load controller config") return 1 } - mgr, err := getManager(cfg, electionConfig, operatorConfig) + mgr, err := getManager(cfg, operatorConfig) if err != nil { setupLog.Error(err, "unable to start manager") return 1 } - // middlewareCfg is shared between all controllers - middlewareCfg := &hubofhubscontrollers.MiddlewareConfig{} - - // start addon controller - if err = (&hubofhubsaddon.HoHAddonInstaller{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("addon-reconciler"), - }).SetupWithManager(ctx, mgr); err != nil { - setupLog.Error(err, "unable to create addon reconciler") - return 1 - } - - addonController, err := hubofhubsaddon.NewHoHAddonController(mgr.GetConfig(), mgr.GetClient(), - electionConfig, middlewareCfg, operatorConfig.GlobalResourceEnabled, controllerConfigMap, operatorConfig.LogLevel) + _, err = crd.AddCRDController(mgr, operatorConfig, kubeClient) if err != nil { - setupLog.Error(err, "unable to create addon controller") - return 1 - } - if err = mgr.Add(addonController); err != nil { - setupLog.Error(err, "unable to add addon controller to manager") - return 1 - } - - if err = (&hubofhubscontrollers.GlobalHubConditionReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("condition-reconciler"), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create GlobalHubStatusReconciler") - return 1 - } - - r := &hubofhubscontrollers.MulticlusterGlobalHubReconciler{ - Manager: mgr, - Client: mgr.GetClient(), - AddonManager: addonController.AddonManager(), - KubeClient: kubeClient, - Scheme: mgr.GetScheme(), - LeaderElection: electionConfig, - Log: ctrl.Log.WithName("global-hub-reconciler"), - MiddlewareConfig: middlewareCfg, - EnableGlobalResource: operatorConfig.GlobalResourceEnabled, - LogLevel: operatorConfig.LogLevel, - } - - if err = r.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create MulticlusterGlobalHubReconciler") - return 1 - } - - backupController := backupcontrollers.NewBackupReconciler( - mgr, - ctrl.Log.WithName("backup-reconciler"), - ) - - if err = mgr.Add(backupController); err != nil { - setupLog.Error(err, "unable to bakcup controller to manager") + setupLog.Error(err, "unable to create crd controller") return 1 } @@ -241,8 +88,8 @@ func doMain(ctx context.Context, cfg *rest.Config) int { return 0 } -func parseFlags() *operatorConfig { - config := &operatorConfig{ +func parseFlags() *config.OperatorConfig { + config := &config.OperatorConfig{ PodNamespace: utils.GetDefaultNamespace(), } @@ -271,15 +118,17 @@ func parseFlags() *operatorConfig { return config } -func getManager(restConfig *rest.Config, electionConfig *commonobjects.LeaderElectionConfig, - operatorConfig *operatorConfig, -) (ctrl.Manager, error) { +func getManager(restConfig *rest.Config, operatorConfig *config.OperatorConfig) (ctrl.Manager, error) { + electionConfig, err := config.GetElectionConfig() + if err != nil { + return nil, err + } leaseDuration := time.Duration(electionConfig.LeaseDuration) * time.Second renewDeadline := time.Duration(electionConfig.RenewDeadline) * time.Second retryPeriod := time.Duration(electionConfig.RetryPeriod) * time.Second mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ - Scheme: scheme, + Scheme: config.GetRuntimeScheme(), Metrics: metricsserver.Options{ BindAddress: operatorConfig.MetricsAddress, }, @@ -290,113 +139,8 @@ func getManager(restConfig *rest.Config, electionConfig *commonobjects.LeaderEle LeaseDuration: &leaseDuration, RenewDeadline: &renewDeadline, RetryPeriod: &retryPeriod, - NewCache: initCache, + NewCache: config.InitCache, }) return mgr, err } - -func getElectionConfig(configMap *corev1.ConfigMap) (*commonobjects.LeaderElectionConfig, error) { - config := &commonobjects.LeaderElectionConfig{ - LeaseDuration: 137, - RenewDeadline: 107, - RetryPeriod: 26, - } - if configMap == nil { - return config, nil - } - val, leaseDurationExist := configMap.Data["leaseDuration"] - if leaseDurationExist { - leaseDurationSec, err := strconv.Atoi(val) - if err != nil { - return nil, err - } - config.LeaseDuration = leaseDurationSec - } - - val, renewDeadlineExist := configMap.Data["renewDeadline"] - if renewDeadlineExist { - renewDeadlineSec, err := strconv.Atoi(val) - if err != nil { - return nil, err - } - config.RenewDeadline = renewDeadlineSec - } - - val, retryPeriodExist := configMap.Data["retryPeriod"] - if retryPeriodExist { - retryPeriodSec, err := strconv.Atoi(val) - if err != nil { - return nil, err - } - config.RetryPeriod = retryPeriodSec - } - - return config, nil -} - -func initCache(config *rest.Config, cacheOpts cache.Options) (cache.Cache, error) { - cacheOpts.ByObject = map[client.Object]cache.ByObject{ - &corev1.Secret{}: { - Field: fields.OneTermEqualSelector(namespacePath, utils.GetDefaultNamespace()), - }, - &corev1.ConfigMap{}: { - Field: fields.OneTermEqualSelector(namespacePath, utils.GetDefaultNamespace()), - }, - &corev1.ServiceAccount{}: { - Label: labelSelector, - }, - &corev1.Service{}: { - Label: labelSelector, - }, - &corev1.Namespace{}: {}, - &appsv1.Deployment{}: { - Label: labelSelector, - }, - &appsv1.StatefulSet{}: { - Label: labelSelector, - }, - &rbacv1.Role{}: { - Label: labelSelector, - }, - &rbacv1.RoleBinding{}: { - Label: labelSelector, - }, - &rbacv1.ClusterRole{}: { - Label: labelSelector, - }, - &rbacv1.ClusterRoleBinding{}: { - Label: labelSelector, - }, - &routev1.Route{}: { - Label: labelSelector, - }, - &clusterv1.ManagedCluster{}: { - Label: labels.SelectorFromSet(labels.Set{"vendor": "OpenShift"}), - }, - &workv1.ManifestWork{}: { - Label: labelSelector, - }, - &addonv1alpha1.ClusterManagementAddOn{}: { - Label: labelSelector, - }, - &addonv1alpha1.ManagedClusterAddOn{}: { - Label: labelSelector, - }, - &admissionregistrationv1.MutatingWebhookConfiguration{}: { - Label: labelSelector, - }, - &promv1.ServiceMonitor{}: { - Label: labelSelector, - }, - &subv1alpha1.Subscription{}: {}, - &corev1.PersistentVolumeClaim{}: { - Field: fields.OneTermEqualSelector(namespacePath, utils.GetDefaultNamespace()), - }, - &mchv1.MultiClusterHub{}: {}, - &apiextensionsv1.CustomResourceDefinition{}: { - Field: fields.SelectorFromSet(fields.Set{"metadata.name": "kafkas.kafka.strimzi.io"}), - }, - } - return cache.New(config, cacheOpts) -} diff --git a/operator/main_test.go b/operator/main_test.go index 31d218a63..e8b83e8b6 100644 --- a/operator/main_test.go +++ b/operator/main_test.go @@ -20,6 +20,7 @@ import ( "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/envtest" + "github.com/stolostron/multicluster-global-hub/operator/pkg/config" operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" "github.com/stolostron/multicluster-global-hub/pkg/utils" ) @@ -99,7 +100,7 @@ var _ = Describe("Start Operator Test", Ordered, func() { operatorConfig := parseFlags() Expect(operatorConfig.LeaderElection).To(BeTrue()) - electionConfig, err := getElectionConfig(nil) + electionConfig, err := config.GetElectionConfig() Expect(err).NotTo(HaveOccurred()) Expect(electionConfig.LeaseDuration).To(Equal(137)) @@ -128,7 +129,7 @@ var _ = Describe("Start Operator Test", Ordered, func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - configMap, err := kubeClient.CoreV1().ConfigMaps( + _, err := kubeClient.CoreV1().ConfigMaps( utils.GetDefaultNamespace()).Create(ctx, &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -138,7 +139,10 @@ var _ = Describe("Start Operator Test", Ordered, func() { Data: map[string]string{"leaseDuration": "10", "renewDeadline": "8", "retryPeriod": "2"}, }, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) - electionConfig, err := getElectionConfig(configMap) + err = config.LoadControllerConfig(ctx, kubeClient) + Expect(err).NotTo(HaveOccurred()) + + electionConfig, err := config.GetElectionConfig() Expect(err).NotTo(HaveOccurred()) Expect(electionConfig.LeaseDuration).To(Equal(10)) diff --git a/operator/pkg/config/cache_config.go b/operator/pkg/config/cache_config.go new file mode 100644 index 000000000..cec162919 --- /dev/null +++ b/operator/pkg/config/cache_config.go @@ -0,0 +1,120 @@ +package config + +import ( + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/stolostron/multicluster-global-hub/pkg/constants" + "github.com/stolostron/multicluster-global-hub/pkg/utils" +) + +var labelSelector = labels.SelectorFromSet( + labels.Set{ + constants.GlobalHubOwnerLabelKey: constants.GHOperatorOwnerLabelVal, + }, +) + +func InitCache(config *rest.Config, cacheOpts cache.Options) (cache.Cache, error) { + cacheOpts.ByObject = map[client.Object]cache.ByObject{ + // addon installer: transport credentials and image pull secret + // global hub controller + &corev1.Secret{}: { + Namespaces: map[string]cache.Config{ + utils.GetDefaultNamespace(): {}, + }, + }, + // global hub condition controller(status changed) + // global hub controller(spec changed) + &appsv1.Deployment{}: { + Namespaces: map[string]cache.Config{ + utils.GetDefaultNamespace(): {LabelSelector: labelSelector}, + }, + }, + // global hub controller - postgres + &appsv1.StatefulSet{}: { + Namespaces: map[string]cache.Config{ + utils.GetDefaultNamespace(): {LabelSelector: labelSelector}, + }, + }, + // global hub controller + &corev1.Service{}: { + Namespaces: map[string]cache.Config{ + utils.GetDefaultNamespace(): {LabelSelector: labelSelector}, + }, + }, + // global hub controller + &corev1.ServiceAccount{}: { + Namespaces: map[string]cache.Config{ + utils.GetDefaultNamespace(): {LabelSelector: labelSelector}, + }, + }, + // global hub controller: postgresCA and custom alert + &corev1.ConfigMap{}: { + Namespaces: map[string]cache.Config{ + utils.GetDefaultNamespace(): {}, + }, + }, + // global hub controller + &rbacv1.Role{}: { + Namespaces: map[string]cache.Config{ + utils.GetDefaultNamespace(): {LabelSelector: labelSelector}, + }, + }, + &rbacv1.RoleBinding{}: { + Namespaces: map[string]cache.Config{ + utils.GetDefaultNamespace(): {LabelSelector: labelSelector}, + }, + }, + &rbacv1.ClusterRole{}: { + Label: labelSelector, + }, + &rbacv1.ClusterRoleBinding{}: { + Label: labelSelector, + }, + &corev1.Namespace{}: { + Field: fields.SelectorFromSet( + fields.Set{ + "metadata.name": utils.GetDefaultNamespace(), + }, + ), + }, + &corev1.PersistentVolumeClaim{}: { + Namespaces: map[string]cache.Config{ + utils.GetDefaultNamespace(): {LabelSelector: labelSelector}, + }, + }, + &admissionregistrationv1.MutatingWebhookConfiguration{}: { + Label: labelSelector, + }, + + // Open the following cache option won't works for the cluster install the following resources + // // addon installer, global hub controller + // &clusterv1.ManagedCluster{}: { + // Label: labels.SelectorFromSet(labels.Set{"vendor": "OpenShift"}), + // }, + // // addon installer, global hub controller + // &addonv1alpha1.ClusterManagementAddOn{}: { + // Label: labelSelector, + // }, + // // addon installer + // &addonv1alpha1.ManagedClusterAddOn{}: { + // Label: labelSelector, + // }, + // // global hub controller + // &promv1.ServiceMonitor{}: { + // Label: labelSelector, + // }, + // // global hub controller + // &subv1alpha1.Subscription{}: {}, + // // backup controller + // &mchv1.MultiClusterHub{}: {}, + } + return cache.New(config, cacheOpts) +} diff --git a/operator/pkg/config/controller_config.go b/operator/pkg/config/controller_config.go new file mode 100644 index 000000000..53f03e590 --- /dev/null +++ b/operator/pkg/config/controller_config.go @@ -0,0 +1,97 @@ +package config + +import ( + "context" + "strconv" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" + "github.com/stolostron/multicluster-global-hub/pkg/constants" + commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" + "github.com/stolostron/multicluster-global-hub/pkg/utils" +) + +var controllerConfigMap *corev1.ConfigMap + +func LoadControllerConfig(ctx context.Context, kubeClient kubernetes.Interface) error { + config, err := kubeClient.CoreV1().ConfigMaps(utils.GetDefaultNamespace()).Get( + ctx, operatorconstants.ControllerConfig, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return err + } + controllerConfigMap = config + return nil +} + +func SetControllerConfig(configMap *corev1.ConfigMap) { + controllerConfigMap = configMap +} + +func GetElectionConfig() (*commonobjects.LeaderElectionConfig, error) { + config := &commonobjects.LeaderElectionConfig{ + LeaseDuration: 137, + RenewDeadline: 107, + RetryPeriod: 26, + } + if controllerConfigMap == nil { + return config, nil + } + val, leaseDurationExist := controllerConfigMap.Data["leaseDuration"] + if leaseDurationExist { + leaseDurationSec, err := strconv.Atoi(val) + if err != nil { + return nil, err + } + config.LeaseDuration = leaseDurationSec + } + + val, renewDeadlineExist := controllerConfigMap.Data["renewDeadline"] + if renewDeadlineExist { + renewDeadlineSec, err := strconv.Atoi(val) + if err != nil { + return nil, err + } + config.RenewDeadline = renewDeadlineSec + } + + val, retryPeriodExist := controllerConfigMap.Data["retryPeriod"] + if retryPeriodExist { + retryPeriodSec, err := strconv.Atoi(val) + if err != nil { + return nil, err + } + config.RetryPeriod = retryPeriodSec + } + + return config, nil +} + +// getAgentRestConfig return the agent qps and burst, if not set, use default QPS and Burst +func GetAgentRestConfig() (float32, int) { + if controllerConfigMap == nil { + return constants.DefaultAgentQPS, constants.DefaultAgentBurst + } + + agentQPS := constants.DefaultAgentQPS + _, agentQPSExist := controllerConfigMap.Data["agentQPS"] + if agentQPSExist { + agentQPSInt, err := strconv.Atoi(controllerConfigMap.Data["agentQPS"]) + if err == nil { + agentQPS = float32(agentQPSInt) + } + } + + agentBurst := constants.DefaultAgentBurst + _, agentBurstExist := controllerConfigMap.Data["agentBurst"] + if agentBurstExist { + agentBurstAtoi, err := strconv.Atoi(controllerConfigMap.Data["agentBurst"]) + if err == nil { + agentBurst = agentBurstAtoi + } + } + return agentQPS, agentBurst +} diff --git a/operator/pkg/config/multiclusterglobalhub_config.go b/operator/pkg/config/multiclusterglobalhub_config.go index 5f2d3ffa1..e9c1ae2e3 100644 --- a/operator/pkg/config/multiclusterglobalhub_config.go +++ b/operator/pkg/config/multiclusterglobalhub_config.go @@ -34,7 +34,6 @@ import ( operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" "github.com/stolostron/multicluster-global-hub/operator/pkg/postgres" "github.com/stolostron/multicluster-global-hub/pkg/constants" - "github.com/stolostron/multicluster-global-hub/pkg/transport" "github.com/stolostron/multicluster-global-hub/pkg/utils" ) @@ -66,7 +65,6 @@ const ( ) var ( - managedClusters = []string{} mghNamespacedName = types.NamespacedName{} oauthSessionSecret = "" imageOverrides = map[string]string{ @@ -79,7 +77,6 @@ var ( statisticLogInterval = "1m" metricsScrapeInterval = "1m" imagePullSecretName = "" - transporter transport.Transporter ) func SetMGHNamespacedName(namespacedName types.NamespacedName) { @@ -183,29 +180,6 @@ func GetImage(componentName string) string { return imageOverrides[componentName] } -// cache the managed clusters -func AppendManagedCluster(name string) { - for index := range managedClusters { - if managedClusters[index] == name { - return - } - } - managedClusters = append(managedClusters, name) -} - -func DeleteManagedCluster(name string) { - for index := range managedClusters { - if managedClusters[index] == name { - managedClusters = append(managedClusters[:index], managedClusters[index+1:]...) - return - } - } -} - -func GetManagedClusters() []string { - return managedClusters -} - func SetStatisticLogInterval(mgh *globalhubv1alpha4.MulticlusterGlobalHub) error { interval := getAnnotation(mgh, operatorconstants.AnnotationStatisticInterval) if interval == "" { @@ -239,14 +213,6 @@ func GetPostgresStorageSize(mgh *globalhubv1alpha4.MulticlusterGlobalHub) string return GHPostgresDefaultStorageSize } -func GetKafkaStorageSize(mgh *globalhubv1alpha4.MulticlusterGlobalHub) string { - defaultKafkaStorageSize := "10Gi" - if mgh.Spec.DataLayer.Kafka.StorageSize != "" { - return mgh.Spec.DataLayer.Kafka.StorageSize - } - return defaultKafkaStorageSize -} - func SetImagePullSecretName(mgh *globalhubv1alpha4.MulticlusterGlobalHub) { if mgh.Spec.ImagePullSecret != imagePullSecretName { imagePullSecretName = mgh.Spec.ImagePullSecret @@ -257,14 +223,6 @@ func GetImagePullSecretName() string { return imagePullSecretName } -func SetTransporter(p transport.Transporter) { - transporter = p -} - -func GetTransporter() transport.Transporter { - return transporter -} - // GeneratePGConnectionFromGHStorageSecret returns a postgres connection from the GH storage secret func GetPGConnectionFromGHStorageSecret(ctx context.Context, client client.Client) ( *postgres.PostgresConnection, error, diff --git a/operator/pkg/config/operator_config.go b/operator/pkg/config/operator_config.go new file mode 100644 index 000000000..2a691b4eb --- /dev/null +++ b/operator/pkg/config/operator_config.go @@ -0,0 +1,10 @@ +package config + +type OperatorConfig struct { + MetricsAddress string + ProbeAddress string + PodNamespace string + LeaderElection bool + GlobalResourceEnabled bool + LogLevel string +} diff --git a/operator/pkg/config/scheme_config.go b/operator/pkg/config/scheme_config.go new file mode 100644 index 000000000..552b8cb49 --- /dev/null +++ b/operator/pkg/config/scheme_config.go @@ -0,0 +1,60 @@ +package config + +import ( + kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2" + routev1 "github.com/openshift/api/route/v1" + subv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" + operatorsv1 "github.com/operator-framework/operator-lifecycle-manager/pkg/package-server/apis/operators/v1" + promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + agentv1 "github.com/stolostron/klusterlet-addon-controller/pkg/apis/agent/v1" + mchv1 "github.com/stolostron/multiclusterhub-operator/api/v1" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" + clusterv1 "open-cluster-management.io/api/cluster/v1" + clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1" + clusterv1beta2 "open-cluster-management.io/api/cluster/v1beta2" + workv1 "open-cluster-management.io/api/work/v1" + policyv1 "open-cluster-management.io/governance-policy-propagator/api/v1" + chnv1 "open-cluster-management.io/multicloud-operators-channel/pkg/apis/apps/v1" + placementrulesv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1" + appsubv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/v1" + appsubV1alpha1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/v1alpha1" + applicationv1beta1 "sigs.k8s.io/application/api/v1beta1" + + globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" +) + +func GetRuntimeScheme() *runtime.Scheme { + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(routev1.AddToScheme(scheme)) + utilruntime.Must(operatorsv1.AddToScheme(scheme)) + utilruntime.Must(clusterv1.AddToScheme(scheme)) + utilruntime.Must(clusterv1beta1.AddToScheme(scheme)) + utilruntime.Must(clusterv1beta2.AddToScheme(scheme)) + utilruntime.Must(workv1.AddToScheme(scheme)) + utilruntime.Must(addonv1alpha1.AddToScheme(scheme)) + utilruntime.Must(globalhubv1alpha4.AddToScheme(scheme)) + utilruntime.Must(appsubv1.SchemeBuilder.AddToScheme(scheme)) + utilruntime.Must(appsubV1alpha1.AddToScheme(scheme)) + utilruntime.Must(subv1alpha1.AddToScheme(scheme)) + utilruntime.Must(chnv1.AddToScheme(scheme)) + utilruntime.Must(placementrulesv1.AddToScheme(scheme)) + utilruntime.Must(policyv1.AddToScheme(scheme)) + utilruntime.Must(applicationv1beta1.AddToScheme(scheme)) + utilruntime.Must(admissionregistrationv1.AddToScheme(scheme)) + utilruntime.Must(mchv1.AddToScheme(scheme)) + utilruntime.Must(agentv1.SchemeBuilder.AddToScheme(scheme)) + utilruntime.Must(promv1.AddToScheme(scheme)) + utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) + + // add Kafka scheme + utilruntime.Must(kafkav1beta2.AddToScheme(scheme)) + // +kubebuilder:scaffold:scheme + + return scheme +} diff --git a/operator/pkg/config/transport_config.go b/operator/pkg/config/transport_config.go new file mode 100644 index 000000000..f1a5381dc --- /dev/null +++ b/operator/pkg/config/transport_config.go @@ -0,0 +1,35 @@ +package config + +import ( + globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" + "github.com/stolostron/multicluster-global-hub/pkg/transport" +) + +var ( + transporter transport.Transporter + kafkaResourceReady = false +) + +func SetTransporter(p transport.Transporter) { + transporter = p +} + +func GetTransporter() transport.Transporter { + return transporter +} + +func GetKafkaResourceReady() bool { + return kafkaResourceReady +} + +func SetKafkaResourceReady(ready bool) { + kafkaResourceReady = ready +} + +func GetKafkaStorageSize(mgh *globalhubv1alpha4.MulticlusterGlobalHub) string { + defaultKafkaStorageSize := "10Gi" + if mgh.Spec.DataLayer.Kafka.StorageSize != "" { + return mgh.Spec.DataLayer.Kafka.StorageSize + } + return defaultKafkaStorageSize +} diff --git a/operator/pkg/controllers/addon/addon_controller.go b/operator/pkg/controllers/addon/addon_controller.go index 41414f4eb..06c4554cf 100644 --- a/operator/pkg/controllers/addon/addon_controller.go +++ b/operator/pkg/controllers/addon/addon_controller.go @@ -8,7 +8,6 @@ import ( operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" mchv1 "github.com/stolostron/multiclusterhub-operator/api/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" @@ -21,10 +20,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" + "github.com/stolostron/multicluster-global-hub/operator/pkg/config" operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" - hubofhubs "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/hubofhubs" "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" - commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" ) // +kubebuilder:rbac:groups=cluster.open-cluster-management.io,resources=managedclusters,verbs=get;list;watch @@ -42,43 +40,35 @@ import ( // +kubebuilder:rbac:groups="",resources=events,verbs=create;update;get;list;watch;delete;deletecollection;patch // +kubebuilder:rbac:groups=packages.operators.coreos.com,resources=packagemanifests,verbs=get;list;watch -type HoHAddonController struct { +type AddonController struct { + addonManager addonmanager.AddonManager kubeConfig *rest.Config client client.Client - leaderElection *commonobjects.LeaderElectionConfig log logr.Logger - addonManager addonmanager.AddonManager - MiddlewareConfig *hubofhubs.MiddlewareConfig EnableGlobalResource bool - ControllerConfig *corev1.ConfigMap LogLevel string } // used to create addon manager -func NewHoHAddonController(kubeConfig *rest.Config, client client.Client, - leaderElection *commonobjects.LeaderElectionConfig, middlewareCfg *hubofhubs.MiddlewareConfig, - enableGlobalResource bool, controllerConfig *corev1.ConfigMap, logLevel string, -) (*HoHAddonController, error) { +func NewAddonController(kubeConfig *rest.Config, client client.Client, operatorConfig *config.OperatorConfig, +) (*AddonController, error) { log := ctrl.Log.WithName("addon-controller") addonMgr, err := addonmanager.New(kubeConfig) if err != nil { log.Error(err, "failed to create addon manager") return nil, err } - return &HoHAddonController{ + return &AddonController{ kubeConfig: kubeConfig, client: client, - leaderElection: leaderElection, log: log, addonManager: addonMgr, - MiddlewareConfig: middlewareCfg, - EnableGlobalResource: enableGlobalResource, - ControllerConfig: controllerConfig, - LogLevel: logLevel, + EnableGlobalResource: operatorConfig.GlobalResourceEnabled, + LogLevel: operatorConfig.LogLevel, }, nil } -func (a *HoHAddonController) Start(ctx context.Context) error { +func (a *AddonController) Start(ctx context.Context) error { addonScheme := runtime.NewScheme() utilruntime.Must(mchv1.AddToScheme(addonScheme)) utilruntime.Must(globalhubv1alpha4.AddToScheme(addonScheme)) @@ -107,11 +97,8 @@ func (a *HoHAddonController) Start(ctx context.Context) error { kubeClient: kubeClient, client: a.client, dynamicClient: dynamicClient, - leaderElectionConfig: a.leaderElection, log: a.log.WithName("values"), - MiddlewareConfig: a.MiddlewareConfig, EnableGlobalResource: a.EnableGlobalResource, - ControllerConfig: a.ControllerConfig, LogLevel: a.LogLevel, } _, err = utils.WaitGlobalHubReady(ctx, a.client, 5*time.Second) @@ -145,6 +132,6 @@ func (a *HoHAddonController) Start(ctx context.Context) error { return a.addonManager.Start(ctx) } -func (a *HoHAddonController) AddonManager() addonmanager.AddonManager { +func (a *AddonController) AddonManager() addonmanager.AddonManager { return a.addonManager } diff --git a/operator/pkg/controllers/addon/addon_controller_manifests.go b/operator/pkg/controllers/addon/addon_controller_manifests.go index d40ea5fb9..2af8b219e 100644 --- a/operator/pkg/controllers/addon/addon_controller_manifests.go +++ b/operator/pkg/controllers/addon/addon_controller_manifests.go @@ -23,10 +23,8 @@ import ( globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" - hubofhubscontroller "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/hubofhubs" "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" - commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" "github.com/stolostron/multicluster-global-hub/pkg/transport" ) @@ -87,11 +85,8 @@ type HohAgentAddon struct { client client.Client kubeClient kubernetes.Interface dynamicClient dynamic.Interface - leaderElectionConfig *commonobjects.LeaderElectionConfig log logr.Logger - MiddlewareConfig *hubofhubscontroller.MiddlewareConfig EnableGlobalResource bool - ControllerConfig *corev1.ConfigMap LogLevel string } @@ -183,7 +178,7 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, imagePullPolicy = mgh.Spec.ImagePullPolicy } - agentQPS, agentBurst := a.getAgentRestConfig(a.ControllerConfig) + agentQPS, agentBurst := config.GetAgentRestConfig() err = utils.WaitTransporterReady(a.ctx, 10*time.Minute) if err != nil { @@ -209,6 +204,11 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, log.Error(err, "failed to unmarshal to agent resources") } + electionConfig, err := config.GetElectionConfig() + if err != nil { + log.Error(err, "failed to get election config") + } + manifestsConfig := ManifestsConfig{ HoHAgentImage: image, ImagePullPolicy: string(imagePullPolicy), @@ -222,9 +222,9 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, KafkaEventTopic: clusterTopic.EventTopic, MessageCompressionType: string(operatorconstants.GzipCompressType), TransportType: string(transport.Kafka), - LeaseDuration: strconv.Itoa(a.leaderElectionConfig.LeaseDuration), - RenewDeadline: strconv.Itoa(a.leaderElectionConfig.RenewDeadline), - RetryPeriod: strconv.Itoa(a.leaderElectionConfig.RetryPeriod), + LeaseDuration: strconv.Itoa(electionConfig.LeaseDuration), + RenewDeadline: strconv.Itoa(electionConfig.RenewDeadline), + RetryPeriod: strconv.Itoa(electionConfig.RetryPeriod), KlusterletNamespace: "open-cluster-management-agent", KlusterletWorkSA: "klusterlet-work-sa", EnableGlobalResource: a.EnableGlobalResource, @@ -259,35 +259,6 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, return addonfactory.StructToValues(manifestsConfig), nil } -// getAgentRestConfig return the agent qps and burst, if not set, use default QPS and Burst -func (a *HohAgentAddon) getAgentRestConfig(configMap *corev1.ConfigMap) (float32, int) { - if configMap == nil { - a.log.V(4).Info("controller configmap is nil, Use default agentQPS", fmt.Sprintf("%f", constants.DefaultAgentQPS), - "AgentBurst:", fmt.Sprintf("%v", constants.DefaultAgentBurst)) - return constants.DefaultAgentQPS, constants.DefaultAgentBurst - } - agentQPS := constants.DefaultAgentQPS - _, agentQPSExist := configMap.Data["agentQPS"] - if agentQPSExist { - agentQPSInt, err := strconv.Atoi(configMap.Data["agentQPS"]) - if err == nil { - agentQPS = float32(agentQPSInt) - } - } - - agentBurst := constants.DefaultAgentBurst - _, agentBurstExist := configMap.Data["agentBurst"] - if agentBurstExist { - agentBurstAtoi, err := strconv.Atoi(configMap.Data["agentBurst"]) - if err == nil { - agentBurst = agentBurstAtoi - } - } - a.log.V(4).Info("AgentQPS:", fmt.Sprintf("%f", agentQPS), "AgentBurst:", fmt.Sprintf("%v", agentBurst)) - - return agentQPS, agentBurst -} - // GetImagePullSecret returns the image pull secret name and data func (a *HohAgentAddon) setImagePullSecret(mgh *globalhubv1alpha4.MulticlusterGlobalHub, cluster *clusterv1.ManagedCluster, manifestsConfig *ManifestsConfig, diff --git a/operator/pkg/controllers/addon/addon_controller_manifests_test.go b/operator/pkg/controllers/addon/addon_controller_manifests_test.go index ccd5bb574..4758ac835 100644 --- a/operator/pkg/controllers/addon/addon_controller_manifests_test.go +++ b/operator/pkg/controllers/addon/addon_controller_manifests_test.go @@ -1,12 +1,12 @@ package addon import ( - "context" "testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/stolostron/multicluster-global-hub/operator/pkg/config" operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" ) @@ -85,10 +85,8 @@ func TestHohAgentAddon_getAgentRestConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - a := &HohAgentAddon{ - ctx: context.Background(), - } - gotQPS, gotBurst := a.getAgentRestConfig(tt.configmap) + config.SetControllerConfig(tt.configmap) + gotQPS, gotBurst := config.GetAgentRestConfig() if gotQPS != tt.wantQPS { t.Errorf("HohAgentAddon.getAgentRestConfig() got = %v, want %v", gotQPS, tt.wantQPS) } diff --git a/operator/pkg/controllers/addon/addon_installer.go b/operator/pkg/controllers/addon/addon_installer.go index 2fca758ba..446045af4 100644 --- a/operator/pkg/controllers/addon/addon_installer.go +++ b/operator/pkg/controllers/addon/addon_installer.go @@ -29,12 +29,12 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/constants" ) -type HoHAddonInstaller struct { +type AddonInstaller struct { client.Client Log logr.Logger } -func (r *HoHAddonInstaller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *AddonInstaller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { mgh, err := utils.WaitGlobalHubReady(ctx, r, 5*time.Second) if err != nil { return ctrl.Result{}, err @@ -86,14 +86,13 @@ func (r *HoHAddonInstaller) Reconcile(ctx context.Context, req ctrl.Request) (ct if err := r.removeResourcesAndAddon(ctx, cluster); err != nil { return ctrl.Result{}, fmt.Errorf("failed to remove resources and addon %s: %v", cluster.Name, err) } - config.DeleteManagedCluster(cluster.Name) return ctrl.Result{}, nil } return ctrl.Result{}, r.reconclieAddonAndResources(ctx, cluster) } -func (r *HoHAddonInstaller) reconclieAddonAndResources(ctx context.Context, cluster *clusterv1.ManagedCluster) error { +func (r *AddonInstaller) reconclieAddonAndResources(ctx context.Context, cluster *clusterv1.ManagedCluster) error { existingAddon := &v1alpha1.ManagedClusterAddOn{ ObjectMeta: metav1.ObjectMeta{ Name: operatorconstants.GHManagedClusterAddonName, @@ -110,7 +109,6 @@ func (r *HoHAddonInstaller) reconclieAddonAndResources(ctx context.Context, clus if err != nil { if errors.IsNotFound(err) { r.Log.Info("creating resourcs and addon", "cluster", cluster.Name, "addon", existingAddon.Name) - config.AppendManagedCluster(cluster.Name) return r.createResourcesAndAddon(ctx, cluster) } else { return fmt.Errorf("failed to get the addon: %v", err) @@ -120,7 +118,6 @@ func (r *HoHAddonInstaller) reconclieAddonAndResources(ctx context.Context, clus // delete if !existingAddon.DeletionTimestamp.IsZero() { r.Log.Info("deleting resourcs and addon", "cluster", cluster.Name, "addon", existingAddon.Name) - config.DeleteManagedCluster(cluster.Name) return r.removeResourcesAndAddon(ctx, cluster) } @@ -140,7 +137,7 @@ func (r *HoHAddonInstaller) reconclieAddonAndResources(ctx context.Context, clus return nil } -func (r *HoHAddonInstaller) updateKafkaResource(cluster *clusterv1.ManagedCluster) error { +func (r *AddonInstaller) updateKafkaResource(cluster *clusterv1.ManagedCluster) error { transporter := config.GetTransporter() clusterUser := transporter.GenerateUserName(cluster.Name) clusterTopic := transporter.GenerateClusterTopic(cluster.Name) @@ -165,7 +162,7 @@ func (r *HoHAddonInstaller) updateKafkaResource(cluster *clusterv1.ManagedCluste return nil } -func (r *HoHAddonInstaller) createResourcesAndAddon(ctx context.Context, cluster *clusterv1.ManagedCluster) error { +func (r *AddonInstaller) createResourcesAndAddon(ctx context.Context, cluster *clusterv1.ManagedCluster) error { expectedAddon, err := expectedManagedClusterAddon(cluster) if err != nil { return err @@ -177,7 +174,7 @@ func (r *HoHAddonInstaller) createResourcesAndAddon(ctx context.Context, cluster return nil } -func (r *HoHAddonInstaller) removeResourcesAndAddon(ctx context.Context, cluster *clusterv1.ManagedCluster) error { +func (r *AddonInstaller) removeResourcesAndAddon(ctx context.Context, cluster *clusterv1.ManagedCluster) error { // should remove the addon first, otherwise it mightn't update the mainfiest work for the addon existingAddon := &v1alpha1.ManagedClusterAddOn{ ObjectMeta: metav1.ObjectMeta{ @@ -197,7 +194,7 @@ func (r *HoHAddonInstaller) removeResourcesAndAddon(ctx context.Context, cluster return nil } -func (r *HoHAddonInstaller) removeResources(ctx context.Context, cluster *clusterv1.ManagedCluster) error { +func (r *AddonInstaller) removeResources(ctx context.Context, cluster *clusterv1.ManagedCluster) error { transporter := config.GetTransporter() clusterUser := transporter.GenerateUserName(cluster.Name) clusterTopic := transporter.GenerateClusterTopic(cluster.Name) @@ -247,7 +244,7 @@ func expectedManagedClusterAddon(cluster *clusterv1.ManagedCluster) (*v1alpha1.M } // SetupWithManager sets up the controller with the Manager. -func (r *HoHAddonInstaller) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (r *AddonInstaller) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { clusterPred := predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { return !filterManagedCluster(e.Object) @@ -323,9 +320,18 @@ func (r *HoHAddonInstaller) SetupWithManager(ctx context.Context, mgr ctrl.Manag }, } + // TODO: investgate why the dynamic cache cannot work + // acmCache, err := config.ACMCache(mgr) + // if err != nil { + // return err + // } + return ctrl.NewControllerManagedBy(mgr). + Named("addonInstaller"). // primary watch for managedcluster - For(&clusterv1.ManagedCluster{}, builder.WithPredicates(clusterPred)). + Watches(&clusterv1.ManagedCluster{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(clusterPred)). + // WatchesRawSource(source.Kind(acmCache, &clusterv1.ManagedCluster{}), + // &handler.EnqueueRequestForObject{}, builder.WithPredicates(clusterPred)). // secondary watch for managedclusteraddon Watches(&v1alpha1.ManagedClusterAddOn{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request { @@ -340,26 +346,43 @@ func (r *HoHAddonInstaller) SetupWithManager(ctx context.Context, mgr ctrl.Manag Watches(&v1alpha1.ClusterManagementAddOn{}, handler.EnqueueRequestsFromMapFunc(r.renderAllManifestsHandler), builder.WithPredicates(clusterManagementAddonPred)). - Watches(&corev1.Secret{}, + // secondary watch for transport credentials or image pull secret + Watches(&corev1.Secret{}, // the cache is set in manager handler.EnqueueRequestsFromMapFunc(r.renderAllManifestsHandler), builder.WithPredicates(secretPred)). Complete(r) } -func (r *HoHAddonInstaller) renderAllManifestsHandler( +func (r *AddonInstaller) renderAllManifestsHandler( ctx context.Context, obj client.Object, ) []reconcile.Request { requests := []reconcile.Request{} - // list all the managedCluster + + hubNames, err := GetAllManagedHubNames(ctx, r.Client) + if err != nil { + r.Log.Error(err, "failed to list managed clusters to trigger addoninstall reconciler") + return requests + } + for _, name := range hubNames { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: name, + }, + }) + } + r.Log.Info("triggers addoninstall reconciler for all managed clusters", "requests", len(requests)) + return requests +} + +func GetAllManagedHubNames(ctx context.Context, c client.Client) ([]string, error) { + names := []string{} managedClusterList := &clusterv1.ManagedClusterList{} - err := r.List(ctx, managedClusterList) + err := c.List(ctx, managedClusterList) if err != nil { if errors.IsNotFound(err) { - r.Log.Info("no managed cluster found to trigger addoninstall reconciler") - return requests + return names, nil } - r.Log.Error(err, "failed to list managed clusters to trigger addoninstall reconciler") - return requests + return nil, err } for i := range managedClusterList.Items { @@ -367,14 +390,9 @@ func (r *HoHAddonInstaller) renderAllManifestsHandler( if filterManagedCluster(&managedCluster) { continue } - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: managedCluster.GetName(), - }, - }) + names = append(names, managedCluster.GetName()) } - r.Log.Info("triggers addoninstall reconciler for all managed clusters", "requests", len(requests)) - return requests + return names, nil } func filterManagedCluster(obj client.Object) bool { diff --git a/operator/pkg/controllers/addon/addon_installer_test.go b/operator/pkg/controllers/addon/addon_installer_test.go index f92580e76..86fc82a5e 100644 --- a/operator/pkg/controllers/addon/addon_installer_test.go +++ b/operator/pkg/controllers/addon/addon_installer_test.go @@ -4,16 +4,14 @@ import ( "context" "fmt" "os" + "path/filepath" "strings" "testing" - kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "open-cluster-management.io/api/addon/v1alpha1" v1 "open-cluster-management.io/api/cluster/v1" @@ -37,7 +35,13 @@ var kubeCfg *rest.Config func TestMain(m *testing.M) { // start testEnv - testEnv := &envtest.Environment{} + testEnv := &envtest.Environment{ + CRDDirectoryPaths: []string{ + filepath.Join("..", "..", "..", "config", "crd", "bases"), + filepath.Join("..", "..", "..", "..", "pkg", "testdata", "crds"), + }, + ErrorIfCRDPathMissing: true, + } var err error kubeCfg, err = testEnv.Start() if err != nil { @@ -126,12 +130,6 @@ func fakeHoHAddon(cluster, installNamespace, addonDeployMode string) *v1alpha1.M } func TestHoHAddonReconciler(t *testing.T) { - addonTestScheme := scheme.Scheme - utilruntime.Must(v1.AddToScheme(addonTestScheme)) - utilruntime.Must(v1alpha1.AddToScheme(addonTestScheme)) - utilruntime.Must(operatorv1alpha4.AddToScheme(addonTestScheme)) - utilruntime.Must(kafkav1beta2.AddToScheme(addonTestScheme)) - namespace := "default" name := "test" config.SetMGHNamespacedName(types.NamespacedName{ @@ -270,13 +268,18 @@ func TestHoHAddonReconciler(t *testing.T) { config.SetMGHNamespacedName(types.NamespacedName{Namespace: "", Name: ""}) } mgr, err := ctrl.NewManager(kubeCfg, ctrl.Options{ + Scheme: config.GetRuntimeScheme(), Metrics: metricsserver.Options{ BindAddress: "0", // disable the metrics serving }, + NewCache: config.InitCache, }) if err != nil { t.Errorf("failed to create manager: %v", err) } + if mgr == nil { + t.Error("the mgr shouldn't be nil") + } objects = append(objects, &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-kafka-user", tc.cluster.Name), @@ -293,8 +296,8 @@ func TestHoHAddonReconciler(t *testing.T) { }, k8sClient) config.SetTransporter(transporter) - r := &hubofhubsaddon.HoHAddonInstaller{ - Client: fake.NewClientBuilder().WithScheme(addonTestScheme).WithObjects(objects...).Build(), + r := &hubofhubsaddon.AddonInstaller{ + Client: fake.NewClientBuilder().WithScheme(mgr.GetScheme()).WithObjects(objects...).Build(), Log: ctrl.Log.WithName("test"), } err = r.SetupWithManager(ctx, mgr) diff --git a/operator/pkg/controllers/addon/addon_integration_test.go b/operator/pkg/controllers/addon/addon_integration_test.go index 0169cc755..7bb3f17da 100644 --- a/operator/pkg/controllers/addon/addon_integration_test.go +++ b/operator/pkg/controllers/addon/addon_integration_test.go @@ -110,6 +110,8 @@ var _ = Describe("addon integration", Ordered, func() { []clusterv1.ManagedClusterClaim{}, clusterAvailableCondition) + fmt.Println("prepared cluster .........................................") + By("By checking the addon CR is created in the cluster ns") addon := &addonv1alpha1.ManagedClusterAddOn{} Eventually(func() error { diff --git a/operator/pkg/controllers/addon/addon_suite_test.go b/operator/pkg/controllers/addon/addon_suite_test.go index 5a5f3d7b4..8d05d1842 100644 --- a/operator/pkg/controllers/addon/addon_suite_test.go +++ b/operator/pkg/controllers/addon/addon_suite_test.go @@ -15,34 +15,18 @@ package addon_test import ( "context" - "encoding/base64" "os" "path/filepath" - "strconv" "testing" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - operatorsv1 "github.com/operator-framework/operator-lifecycle-manager/pkg/package-server/apis/operators/v1" - agentv1 "github.com/stolostron/klusterlet-addon-controller/pkg/apis/agent/v1" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" - addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" - clusterv1 "open-cluster-management.io/api/cluster/v1" - clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1" - clusterv1beta2 "open-cluster-management.io/api/cluster/v1beta2" - workv1 "open-cluster-management.io/api/work/v1" - chnv1 "open-cluster-management.io/multicloud-operators-channel/pkg/apis/apps/v1" - placementrulesv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1" - appsubv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/v1" - appsubV1alpha1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/v1alpha1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -53,13 +37,9 @@ import ( globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/condition" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" - operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon" - hubofhubscontroller "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/hubofhubs" transportprotocol "github.com/stolostron/multicluster-global-hub/operator/pkg/transporter" "github.com/stolostron/multicluster-global-hub/pkg/constants" - commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" - "github.com/stolostron/multicluster-global-hub/pkg/transport" "github.com/stolostron/multicluster-global-hub/pkg/utils" "github.com/stolostron/multicluster-global-hub/test/pkg/kafka" ) @@ -102,36 +82,8 @@ var _ = BeforeSuite(func() { Expect(cfg).NotTo(BeNil()) // add scheme - err = operatorsv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = clusterv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = clusterv1beta1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = clusterv1beta2.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = workv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = addonv1alpha1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = appsubv1.SchemeBuilder.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = appsubV1alpha1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = chnv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = placementrulesv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = globalhubv1alpha4.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = appsv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - err = agentv1.SchemeBuilder.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - // +kubebuilder:scaffold:scheme - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + runtimeScheme := config.GetRuntimeScheme() + k8sClient, err = client.New(cfg, client.Options{Scheme: runtimeScheme}) Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) @@ -140,12 +92,14 @@ var _ = BeforeSuite(func() { k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ Metrics: metricsserver.Options{ BindAddress: "0", // disable the metrics serving - }, Scheme: scheme.Scheme, + }, + Scheme: runtimeScheme, + NewCache: config.InitCache, }) Expect(err).ToNot(HaveOccurred()) By("Add the addon installer to the manager") - err = (&addon.HoHAddonInstaller{ + err = (&addon.AddonInstaller{ Client: k8sClient, Log: ctrl.Log.WithName("addon install controller"), }).SetupWithManager(ctx, k8sManager) @@ -153,20 +107,14 @@ var _ = BeforeSuite(func() { kubeClient, err := kubernetes.NewForConfig(k8sManager.GetConfig()) Expect(err).ToNot(HaveOccurred()) - electionConfig, err := getElectionConfig(kubeClient) + err = config.LoadControllerConfig(ctx, kubeClient) Expect(err).ToNot(HaveOccurred()) By("Add the addon controller to the manager") - middlewareCfg := &hubofhubscontroller.MiddlewareConfig{ - TransportConn: &transport.ConnCredential{ - BootstrapServer: kafka.KafkaBootstrapServer, - CACert: base64.StdEncoding.EncodeToString([]byte(kafka.KafkaCA)), - ClientCert: kafka.KafkaClientCert, - ClientKey: kafka.KafkaClientKey, - }, - } - addonController, err := addon.NewHoHAddonController(k8sManager.GetConfig(), k8sClient, - electionConfig, middlewareCfg, true, &corev1.ConfigMap{}, "info") + addonController, err := addon.NewAddonController(k8sManager.GetConfig(), k8sClient, &config.OperatorConfig{ + GlobalResourceEnabled: true, + LogLevel: "info", + }) Expect(err).ToNot(HaveOccurred()) err = k8sManager.Add(addonController) Expect(err).ToNot(HaveOccurred()) @@ -283,40 +231,3 @@ func prepareBeforeTest() { }, k8sClient) config.SetTransporter(transporter) } - -func getElectionConfig(kubeClient *kubernetes.Clientset) (*commonobjects.LeaderElectionConfig, error) { - cfg := &commonobjects.LeaderElectionConfig{ - LeaseDuration: 137, - RenewDeadline: 107, - RetryPeriod: 26, - } - - configMap, err := kubeClient.CoreV1().ConfigMaps(utils.GetDefaultNamespace()).Get( - context.TODO(), operatorconstants.ControllerConfig, metav1.GetOptions{}) - if errors.IsNotFound(err) { - return cfg, nil - } - if err != nil { - return nil, err - } - - leaseDurationSec, err := strconv.Atoi(configMap.Data["leaseDuration"]) - if err != nil { - return nil, err - } - - renewDeadlineSec, err := strconv.Atoi(configMap.Data["renewDeadline"]) - if err != nil { - return nil, err - } - - retryPeriodSec, err := strconv.Atoi(configMap.Data["retryPeriod"]) - if err != nil { - return nil, err - } - - cfg.LeaseDuration = leaseDurationSec - cfg.RenewDeadline = renewDeadlineSec - cfg.RetryPeriod = retryPeriodSec - return cfg, nil -} diff --git a/operator/pkg/controllers/backup/backup_start.go b/operator/pkg/controllers/backup/backup_start.go index 29fae03c2..512a1446b 100644 --- a/operator/pkg/controllers/backup/backup_start.go +++ b/operator/pkg/controllers/backup/backup_start.go @@ -19,7 +19,6 @@ package backup import ( "context" "reflect" - "time" "github.com/go-logr/logr" mchv1 "github.com/stolostron/multiclusterhub-operator/api/v1" @@ -36,7 +35,6 @@ import ( globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" - operatorutils "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" "github.com/stolostron/multicluster-global-hub/pkg/utils" ) @@ -61,24 +59,10 @@ func NewBackupReconciler(mgr manager.Manager, log logr.Logger) *BackupReconciler } } -func (r *BackupReconciler) Start(ctx context.Context) error { - // Only when global hub started, then start the backup controller - _, err := operatorutils.WaitGlobalHubReady(ctx, r.Client, 5*time.Second) - if err != nil { - return err - } - - if err = r.SetupWithManager(r.Manager); err != nil { - return err - } - return nil -} - // SetupWithManager sets up the controller with the Manager. func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr).Named("backupController"). - Watches(&globalhubv1alpha4.MulticlusterGlobalHub{}, - objEventHandler, + For(&globalhubv1alpha4.MulticlusterGlobalHub{}, builder.WithPredicates(mghPred)). Watches(&corev1.Secret{}, objEventHandler, @@ -157,18 +141,6 @@ var configmapPred = predicate.Funcs{ }, } -var commonPred = predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - return !utils.HasItem(e.Object.GetLabels(), constants.BackupKey, constants.BackupActivationValue) - }, - UpdateFunc: func(e event.UpdateEvent) bool { - return !utils.HasItem(e.ObjectNew.GetLabels(), constants.BackupKey, constants.BackupActivationValue) - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return false - }, -} - var mchPred = predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { return true diff --git a/operator/pkg/controllers/backup/integration_test.go b/operator/pkg/controllers/backup/integration_test.go index f0227d40f..de4a3e59e 100644 --- a/operator/pkg/controllers/backup/integration_test.go +++ b/operator/pkg/controllers/backup/integration_test.go @@ -131,7 +131,6 @@ var _ = Describe("Backup controller", Ordered, func() { Namespace: mghNamespace, Name: mghName, }, mgh, &client.GetOptions{})).Should(Succeed()) - mgh.Labels = nil Expect(k8sClient.Update(ctx, mgh, &client.UpdateOptions{})) Eventually(func() bool { diff --git a/operator/pkg/controllers/crd/crd_controller.go b/operator/pkg/controllers/crd/crd_controller.go new file mode 100644 index 000000000..f773b5e93 --- /dev/null +++ b/operator/pkg/controllers/crd/crd_controller.go @@ -0,0 +1,195 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package crd + +import ( + "context" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/client-go/kubernetes" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/stolostron/multicluster-global-hub/operator/pkg/config" + "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon" + "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/backup" + globalhubcontrollers "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/hubofhubs" +) + +var ACMCrds = []string{ + "multiclusterhubs.operator.open-cluster-management.io", + "managedclusters.cluster.open-cluster-management.io", + "clustermanagementaddons.addon.open-cluster-management.io", + "managedclusteraddons.addon.open-cluster-management.io", + "manifestworks.work.open-cluster-management.io", +} + +var KafkaCrds = []string{ + "kafkas.kafka.strimzi.io", + "kafkatopics.kafka.strimzi.io", + "kafkausers.kafka.strimzi.io", +} + +// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch;update + +// CrdController reconciles a ACM Resources. It might be removed once the target controller is started. +// https://github.com/kubernetes-sigs/controller-runtime/pull/2159 +type CrdController struct { + manager.Manager + kubeClient *kubernetes.Clientset + operatorConfig *config.OperatorConfig + resources map[string]bool + addonInstallerReady bool + addonController *addon.AddonController + globalHubConditionControllerReady bool + globalHubControllerReady bool + backupControllerReady bool +} + +func (r *CrdController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.resources[req.Name] = true + if !r.readyToWatchACMResources() { + return ctrl.Result{}, nil + } + + // mark the states of kafka crd + if r.readyToWatchKafkaResources() { + config.SetKafkaResourceReady(true) + } + + // start addon installer + if !r.addonInstallerReady { + if err := (&addon.AddonInstaller{ + Client: r.GetClient(), + Log: ctrl.Log.WithName("addon-reconciler"), + }).SetupWithManager(ctx, r.Manager); err != nil { + return ctrl.Result{}, err + } + r.addonInstallerReady = true + } + + // start addon controller + if r.addonController == nil { + addonController, err := addon.NewAddonController(r.Manager.GetConfig(), r.Manager.GetClient(), r.operatorConfig) + if err != nil { + return ctrl.Result{}, err + } + err = r.Manager.Add(addonController) + if err != nil { + return ctrl.Result{}, err + } + r.addonController = addonController + } + + // global hub condition controller + if !r.globalHubConditionControllerReady { + if err := (&globalhubcontrollers.GlobalHubConditionReconciler{ + Client: r.Manager.GetClient(), + Log: ctrl.Log.WithName("condition-reconciler"), + }).SetupWithManager(r.Manager); err != nil { + return ctrl.Result{}, err + } + r.globalHubConditionControllerReady = true + } + + // global hub controller + if !r.globalHubControllerReady { + err := globalhubcontrollers.NewMulticlusterGlobalHubReconciler( + r.Manager, + r.addonController.AddonManager(), + r.kubeClient, + r.operatorConfig).SetupWithManager(r.Manager) + if err != nil { + return ctrl.Result{}, err + } + r.globalHubControllerReady = true + } + + // backup controller + if !r.backupControllerReady { + err := backup.NewBackupReconciler(r.Manager, ctrl.Log.WithName("backup-reconciler")).SetupWithManager(r.Manager) + if err != nil { + return ctrl.Result{}, err + } + r.backupControllerReady = true + } + + return ctrl.Result{}, nil +} + +func (r *CrdController) readyToWatchACMResources() bool { + for _, val := range ACMCrds { + if ready := r.resources[val]; !ready { + return false + } + } + return true +} + +func (r *CrdController) readyToWatchKafkaResources() bool { + for _, val := range KafkaCrds { + if ready := r.resources[val]; !ready { + return false + } + } + return true +} + +func AddCRDController(mgr ctrl.Manager, operatorConfig *config.OperatorConfig, + kubeClient *kubernetes.Clientset, +) (*CrdController, error) { + allCrds := map[string]bool{} + for _, val := range ACMCrds { + allCrds[val] = false + } + for _, val := range KafkaCrds { + allCrds[val] = false + } + + controller := &CrdController{ + Manager: mgr, + kubeClient: kubeClient, + operatorConfig: operatorConfig, + resources: allCrds, + } + + crdPred := predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + _, ok := allCrds[e.Object.GetName()] + return ok + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + } + + return controller, ctrl.NewControllerManagedBy(mgr). + Named("CRDController"). + WatchesMetadata( + &apiextensionsv1.CustomResourceDefinition{}, + &handler.EnqueueRequestForObject{}, + builder.WithPredicates(crdPred), + ). + Complete(controller) +} diff --git a/operator/pkg/controllers/crd/crd_controller_test.go b/operator/pkg/controllers/crd/crd_controller_test.go new file mode 100644 index 000000000..fa2a0edb5 --- /dev/null +++ b/operator/pkg/controllers/crd/crd_controller_test.go @@ -0,0 +1,170 @@ +// Copyright (c) 2024 Red Hat, Inc. +// Copyright Contributors to the Open Cluster Management project + +package crd + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "log" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/yaml" + yamlutil "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/envtest" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + "github.com/stolostron/multicluster-global-hub/operator/pkg/config" +) + +var ( + cfg *rest.Config + ctx context.Context + cancel context.CancelFunc + kubeClient *kubernetes.Clientset + dynamicClient *dynamic.DynamicClient +) + +func TestMain(m *testing.M) { + ctx, cancel = context.WithCancel(context.Background()) + + err := os.Setenv("POD_NAMESPACE", "default") + if err != nil { + panic(err) + } + // start testenv + testenv := &envtest.Environment{ + // CRDDirectoryPaths: []string{ + // filepath.Join("..", "..", "..", "pkg", "testdata", "crds"), + // }, + ErrorIfCRDPathMissing: true, + } + + cfg, err = testenv.Start() + if err != nil { + panic(err) + } + + if cfg == nil { + panic(fmt.Errorf("empty kubeconfig!")) + } + + kubeClient, err = kubernetes.NewForConfig(cfg) + if err != nil { + panic(err) + } + + dynamicClient, err = dynamic.NewForConfig(cfg) + if err != nil { + panic(err) + } + + // run testings + code := m.Run() + + // stop testenv + if err := testenv.Stop(); err != nil { + panic(err) + } + os.Exit(code) +} + +func TestCRDCtr(t *testing.T) { + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + Metrics: metricsserver.Options{ + BindAddress: "0", // disable the metrics serving + }, Scheme: scheme.Scheme, + }) + assert.Nil(t, err) + + controller, err := AddCRDController(mgr, &config.OperatorConfig{}, nil) + assert.Nil(t, err) + + go func() { + err := mgr.Start(ctx) + assert.Nil(t, err) + }() + assert.True(t, mgr.GetCache().WaitForCacheSync(ctx)) + + clusterResource := "managedclusters.cluster.open-cluster-management.io" + clusterResourceFile := "0000_00_cluster.open-cluster-management.io_managedclusters.crd.yaml" + + for _, ok := range controller.resources { + assert.False(t, ok) + } + + err = applyYaml(filepath.Join("..", "..", "..", "..", "pkg", "testdata", "crds", clusterResourceFile)) + assert.Nil(t, err) + time.Sleep(1 * time.Second) + + for resource, ok := range controller.resources { + if resource == clusterResource { + assert.True(t, ok) + } else { + assert.False(t, ok) + } + } + cancel() +} + +func applyYaml(file string) error { + b, err := ioutil.ReadFile(file) + if err != nil { + return err + } + decoder := yamlutil.NewYAMLOrJSONDecoder(bytes.NewReader(b), 100) + var rawObj runtime.RawExtension + if err = decoder.Decode(&rawObj); err != nil { + return err + } + obj, gvk, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, nil) + if err != nil { + return err + } + unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + log.Fatal(err) + } + + unstructuredObj := &unstructured.Unstructured{Object: unstructuredMap} + + gr, err := restmapper.GetAPIGroupResources(kubeClient.Discovery()) + if err != nil { + return err + } + + mapper := restmapper.NewDiscoveryRESTMapper(gr) + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return err + } + + var dri dynamic.ResourceInterface + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + if unstructuredObj.GetNamespace() == "" { + unstructuredObj.SetNamespace("default") + } + dri = dynamicClient.Resource(mapping.Resource).Namespace(unstructuredObj.GetNamespace()) + } else { + dri = dynamicClient.Resource(mapping.Resource) + } + + _, err = dri.Create(ctx, unstructuredObj, metav1.CreateOptions{}) + return err +} diff --git a/operator/pkg/controllers/hubofhubs/globalhub_condition.go b/operator/pkg/controllers/hubofhubs/globalhub_condition.go index ddfe9f611..abe912437 100644 --- a/operator/pkg/controllers/hubofhubs/globalhub_condition.go +++ b/operator/pkg/controllers/hubofhubs/globalhub_condition.go @@ -107,9 +107,9 @@ func (r *GlobalHubConditionReconciler) SetupWithManager(mgr manager.Manager) err }, } return ctrl.NewControllerManagedBy(mgr). + Named("conditionController"). For(&globalhubv1alpha4.MulticlusterGlobalHub{}, builder.WithPredicates(mghPred)). Watches(&appsv1.Deployment{}, - handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), - &globalhubv1alpha4.MulticlusterGlobalHub{})). + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &globalhubv1alpha4.MulticlusterGlobalHub{})). Complete(r) } diff --git a/operator/pkg/controllers/hubofhubs/globalhub_controller.go b/operator/pkg/controllers/hubofhubs/globalhub_controller.go index f84832617..070cf8c24 100644 --- a/operator/pkg/controllers/hubofhubs/globalhub_controller.go +++ b/operator/pkg/controllers/hubofhubs/globalhub_controller.go @@ -39,7 +39,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog" "open-cluster-management.io/addon-framework/pkg/addonmanager" - addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" + "open-cluster-management.io/api/addon/v1alpha1" clusterv1 "open-cluster-management.io/api/cluster/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -55,12 +55,11 @@ import ( "github.com/stolostron/multicluster-global-hub/operator/pkg/condition" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" + "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon" "github.com/stolostron/multicluster-global-hub/operator/pkg/postgres" transportprotocol "github.com/stolostron/multicluster-global-hub/operator/pkg/transporter" "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" - commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" - "github.com/stolostron/multicluster-global-hub/pkg/transport" ) //go:embed manifests @@ -86,10 +85,9 @@ var watchedConfigmap = sets.NewString( type MulticlusterGlobalHubReconciler struct { manager.Manager client.Client - AddonManager addonmanager.AddonManager - KubeClient kubernetes.Interface Scheme *runtime.Scheme - LeaderElection *commonobjects.LeaderElectionConfig + addonMgr addonmanager.AddonManager + KubeClient kubernetes.Interface Log logr.Logger LogLevel string MiddlewareConfig *MiddlewareConfig @@ -99,10 +97,20 @@ type MulticlusterGlobalHubReconciler struct { KafkaController *KafkaController } -// MiddlewareConfig defines the configuration for middleware and shared in opearator -type MiddlewareConfig struct { - StorageConn *postgres.PostgresConnection - TransportConn *transport.ConnCredential +func NewMulticlusterGlobalHubReconciler(mgr ctrl.Manager, addonMgr addonmanager.AddonManager, + kubeClient kubernetes.Interface, operatorConfig *config.OperatorConfig, +) *MulticlusterGlobalHubReconciler { + return &MulticlusterGlobalHubReconciler{ + Manager: mgr, + Client: mgr.GetClient(), + addonMgr: addonMgr, + KubeClient: kubeClient, + Scheme: mgr.GetScheme(), + Log: ctrl.Log.WithName("global-hub-reconciler"), + MiddlewareConfig: &MiddlewareConfig{}, + EnableGlobalResource: operatorConfig.GlobalResourceEnabled, + LogLevel: operatorConfig.LogLevel, + } } // +kubebuilder:rbac:groups=operator.open-cluster-management.io,resources=multiclusterglobalhubs,verbs=get;list;watch;create;update;patch;delete @@ -301,9 +309,13 @@ func (r *MulticlusterGlobalHubReconciler) reconcileGlobalHub(ctx context.Context } // reconcile addon - r.Log.Info("trigger addon on managed clusters", "size", len(config.GetManagedClusters())) - for _, clusterName := range config.GetManagedClusters() { - r.AddonManager.Trigger(clusterName, operatorconstants.GHClusterManagementAddonName) + managedHubs, err := addon.GetAllManagedHubNames(ctx, r.Client) + if err != nil { + return err + } + r.Log.Info("trigger addon on managed clusters", "size", len(managedHubs)) + for _, clusterName := range managedHubs { + r.addonMgr.Trigger(clusterName, operatorconstants.GHClusterManagementAddonName) } return nil @@ -500,16 +512,21 @@ func (r *MulticlusterGlobalHubReconciler) SetupWithManager(mgr ctrl.Manager) err // secondary watch for clusterrolebinding Watches(&rbacv1.ClusterRoleBinding{}, globalHubEventHandler, builder.WithPredicates(resPred)). - // secondary watch for clustermanagementaddon - Watches(&addonv1alpha1.ClusterManagementAddOn{}, - globalHubEventHandler, builder.WithPredicates(resPred)). + // secondary watch for Secret Watches(&corev1.Secret{}, globalHubEventHandler, builder.WithPredicates(secretPred)). + // secondary watch for clustermanagementaddon + Watches(&v1alpha1.ClusterManagementAddOn{}, + globalHubEventHandler, + builder.WithPredicates(resPred)). + Watches(&clusterv1.ManagedCluster{}, + globalHubEventHandler, + builder.WithPredicates(mhPred)). Watches(&promv1.ServiceMonitor{}, - globalHubEventHandler, builder.WithPredicates(resPred)). + globalHubEventHandler, + builder.WithPredicates(resPred)). Watches(&subv1alpha1.Subscription{}, - globalHubEventHandler, builder.WithPredicates(deletePred)). - Watches(&clusterv1.ManagedCluster{}, - globalHubEventHandler, builder.WithPredicates(mhPred)). + globalHubEventHandler, + builder.WithPredicates(deletePred)). Complete(r) } diff --git a/operator/pkg/controllers/hubofhubs/globalhub_kafka_controller.go b/operator/pkg/controllers/hubofhubs/globalhub_kafka_controller.go index db4857a7b..90a64f488 100644 --- a/operator/pkg/controllers/hubofhubs/globalhub_kafka_controller.go +++ b/operator/pkg/controllers/hubofhubs/globalhub_kafka_controller.go @@ -8,37 +8,31 @@ import ( kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2" "github.com/go-logr/logr" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" - globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" - "github.com/stolostron/multicluster-global-hub/operator/pkg/config" "github.com/stolostron/multicluster-global-hub/pkg/transport" "github.com/stolostron/multicluster-global-hub/pkg/utils" ) +// kafkaReconcileHandler use to render all the kafka resources when its resources is changed, and also get the new conn. +type kafkaReconcileHandler func() (*transport.ConnCredential, error) + // KafkaController reconciles the kafka crd type KafkaController struct { - Log logr.Logger - mgr ctrl.Manager - globalHubReconciler *MulticlusterGlobalHubReconciler - conn *transport.ConnCredential + Log logr.Logger + mgr ctrl.Manager + reconcileHandler kafkaReconcileHandler + conn *transport.ConnCredential } func (r *KafkaController) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { // get the mcgh cr name and then trigger the globalhub reconciler - mgh := &globalhubv1alpha4.MulticlusterGlobalHub{} - err := r.mgr.GetClient().Get(ctx, config.GetMGHNamespacedName(), mgh) - if err != nil { - r.Log.Error(err, "failed to get MulticlusterGlobalHub") - return ctrl.Result{}, err - } - - r.conn, err = r.globalHubReconciler.ReconcileTransport(ctx, mgh, transport.StrimziTransporter) + var err error + r.conn, err = r.reconcileHandler() if err != nil { r.Log.Error(err, "failed to get connection from kafka reconciler") return ctrl.Result{}, err @@ -60,12 +54,12 @@ var kafkaPred = predicate.Funcs{ // initialize the kafka transporter and then start the kafka/user/topic controller func startKafkaController(ctx context.Context, mgr ctrl.Manager, - reconciler *MulticlusterGlobalHubReconciler, + getConnFunc kafkaReconcileHandler, ) (*KafkaController, error) { r := &KafkaController{ - Log: ctrl.Log.WithName("kafka-controller"), - mgr: mgr, - globalHubReconciler: reconciler, + Log: ctrl.Log.WithName("kafka-controller"), + mgr: mgr, + reconcileHandler: getConnFunc, } _, err := r.Reconcile(ctx, ctrl.Request{}) if err != nil { @@ -87,44 +81,3 @@ func startKafkaController(ctx context.Context, mgr ctrl.Manager, r.Log.Info("kafka controller is started") return r, nil } - -type kafkaCRDController struct { - mgr ctrl.Manager - globaHubReconciler *MulticlusterGlobalHubReconciler -} - -func (c *kafkaCRDController) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { - if c.globaHubReconciler.KafkaController == nil { - reconclier, err := startKafkaController(ctx, c.mgr, c.globaHubReconciler) - if err != nil { - return ctrl.Result{}, err - } - c.globaHubReconciler.KafkaController = reconclier - } - return ctrl.Result{}, nil -} - -// this controller is used to watch the Kafka/KafkaTopic/KafkaUser crd -// if the crd exists, then add controllers to the manager dynamically -func addKafkaCRDController(mgr ctrl.Manager, reconciler *MulticlusterGlobalHubReconciler) error { - return ctrl.NewControllerManagedBy(mgr). - For(&apiextensionsv1.CustomResourceDefinition{}, builder.WithPredicates(predicate.Funcs{ - // trigger the reconciler only if the crd is created - CreateFunc: func(e event.CreateEvent) bool { - return e.Object.GetName() == "kafkas.kafka.strimzi.io" - }, - UpdateFunc: func(e event.UpdateEvent) bool { - return false - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return false - }, - GenericFunc: func(e event.GenericEvent) bool { - return false - }, - })). - Complete(&kafkaCRDController{ - mgr: mgr, - globaHubReconciler: reconciler, - }) -} diff --git a/operator/pkg/controllers/hubofhubs/globalhub_manager.go b/operator/pkg/controllers/hubofhubs/globalhub_manager.go index b060acd66..f370d42b5 100644 --- a/operator/pkg/controllers/hubofhubs/globalhub_manager.go +++ b/operator/pkg/controllers/hubofhubs/globalhub_manager.go @@ -91,7 +91,7 @@ func (r *MulticlusterGlobalHubReconciler) reconcileManager(ctx context.Context, transportTopic := trans.GenerateClusterTopic(transportprotocol.GlobalHubClusterName) transportConn, err := trans.GetConnCredential(transportprotocol.DefaultGlobalHubKafkaUser) if err != nil { - return fmt.Errorf("failed to get global hub transport connection: %v", err) + return fmt.Errorf("failed to get global hub transport connection: %w", err) } if r.MiddlewareConfig.StorageConn == nil { @@ -100,9 +100,14 @@ func (r *MulticlusterGlobalHubReconciler) reconcileManager(ctx context.Context, if isMiddlewareUpdated(r.MiddlewareConfig) { err = commonutils.RestartPod(ctx, r.KubeClient, commonutils.GetDefaultNamespace(), constants.ManagerDeploymentName) if err != nil { - return fmt.Errorf("failed to restart manager pod: %v", err) + return fmt.Errorf("failed to restart manager pod: %w", err) } } + electionConfig, err := config.GetElectionConfig() + if err != nil { + return fmt.Errorf("failed to get the electionConfig %w", err) + } + managerObjects, err := hohRenderer.Render("manifests/manager", "", func(profile string) (interface{}, error) { return ManagerVariables{ Image: config.GetImage(config.GlobalHubManagerImageKey), @@ -125,9 +130,9 @@ func (r *MulticlusterGlobalHubReconciler) reconcileManager(ctx context.Context, Namespace: commonutils.GetDefaultNamespace(), MessageCompressionType: string(operatorconstants.GzipCompressType), TransportType: string(transport.Kafka), - LeaseDuration: strconv.Itoa(r.LeaderElection.LeaseDuration), - RenewDeadline: strconv.Itoa(r.LeaderElection.RenewDeadline), - RetryPeriod: strconv.Itoa(r.LeaderElection.RetryPeriod), + LeaseDuration: strconv.Itoa(electionConfig.LeaseDuration), + RenewDeadline: strconv.Itoa(electionConfig.RenewDeadline), + RetryPeriod: strconv.Itoa(electionConfig.RetryPeriod), SchedulerInterval: config.GetSchedulerInterval(mgh), SkipAuth: config.SkipAuth(mgh), LaunchJobNames: config.GetLaunchJobNames(mgh), diff --git a/operator/pkg/controllers/hubofhubs/globalhub_middleware.go b/operator/pkg/controllers/hubofhubs/globalhub_middleware.go index 99395f2e5..c34b61220 100644 --- a/operator/pkg/controllers/hubofhubs/globalhub_middleware.go +++ b/operator/pkg/controllers/hubofhubs/globalhub_middleware.go @@ -45,6 +45,11 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/utils" ) +type MiddlewareConfig struct { + StorageConn *postgres.PostgresConnection + TransportConn *transport.ConnCredential +} + // ReconcileMiddleware creates the kafka and postgres if needed. // 1. create the kafka and postgres subscription at the same time // 2. then create the kafka and postgres resources at the same time @@ -83,12 +88,23 @@ func (r *MulticlusterGlobalHubReconciler) ReconcileMiddleware(ctx context.Contex return } - err = addKafkaCRDController(r.Manager, r) - if err != nil { - errorChan <- err + if !config.GetKafkaResourceReady() { + errorChan <- errors.New("kafka resources is not ready") return } + // start the kafka controller + if r.KafkaController == nil { + kafkaController, err := startKafkaController(ctx, r.Manager, func() (*transport.ConnCredential, error) { + return r.ReconcileTransport(ctx, mgh, transport.StrimziTransporter) + }) + if err != nil { + errorChan <- err + return + } + r.KafkaController = kafkaController + } + r.KafkaInit = true } if r.KafkaController == nil || r.KafkaController.conn == nil { diff --git a/operator/pkg/controllers/hubofhubs/integration_test.go b/operator/pkg/controllers/hubofhubs/integration_test.go index d5850bc3d..5bc2e86d5 100644 --- a/operator/pkg/controllers/hubofhubs/integration_test.go +++ b/operator/pkg/controllers/hubofhubs/integration_test.go @@ -989,7 +989,6 @@ var _ = Describe("MulticlusterGlobalHub controller", Ordered, func() { Eventually(func() error { mghReconciler.MiddlewareConfig.StorageConn = nil mghReconciler.MiddlewareConfig.TransportConn = nil - mghReconciler.ReconcileMiddleware(ctx, mcgh) if mghReconciler.MiddlewareConfig.StorageConn == nil { return fmt.Errorf("mghReconciler.MiddlewareConfig.PgConnection should be nil") diff --git a/operator/pkg/controllers/hubofhubs/suite_test.go b/operator/pkg/controllers/hubofhubs/suite_test.go index 7bd31c6c0..988c31be7 100644 --- a/operator/pkg/controllers/hubofhubs/suite_test.go +++ b/operator/pkg/controllers/hubofhubs/suite_test.go @@ -51,8 +51,8 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" + "github.com/stolostron/multicluster-global-hub/operator/pkg/config" hubofhubscontroller "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/hubofhubs" - commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" "github.com/stolostron/multicluster-global-hub/test/pkg/kafka" "github.com/stolostron/multicluster-global-hub/test/pkg/testpostgres" ) @@ -93,6 +93,7 @@ var _ = BeforeSuite(func() { }, ErrorIfCRDPathMissing: true, } + config.SetKafkaResourceReady(true) testEnv.ControlPlane.GetAPIServer().Configure().Set("disable-admission-plugins", "ServiceAccount,MutatingAdmissionWebhook,ValidatingAdmissionWebhook") @@ -155,19 +156,11 @@ var _ = BeforeSuite(func() { err = kafka.CreateTestTransportSecret(k8sClient, testNamespace) Expect(err).Should(Succeed()) - // the leader election will be propagate to global hub manager - leaderElection := &commonobjects.LeaderElectionConfig{ - LeaseDuration: 137, - RenewDeadline: 107, - RetryPeriod: 26, - } - mghReconciler = &hubofhubscontroller.MulticlusterGlobalHubReconciler{ Manager: k8sManager, Client: k8sManager.GetClient(), KubeClient: kubeClient, Scheme: k8sManager.GetScheme(), - LeaderElection: leaderElection, Log: ctrl.Log.WithName("multicluster-global-hub-reconciler"), LogLevel: "info", MiddlewareConfig: &hubofhubscontroller.MiddlewareConfig{},