diff --git a/agent/cmd/agent/main.go b/agent/cmd/agent/main.go index 974279125..9da690544 100644 --- a/agent/cmd/agent/main.go +++ b/agent/cmd/agent/main.go @@ -34,8 +34,10 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/jobs" commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" "github.com/stolostron/multicluster-global-hub/pkg/transport" + "github.com/stolostron/multicluster-global-hub/pkg/transport/controller" "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" "github.com/stolostron/multicluster-global-hub/pkg/utils" + commonutils "github.com/stolostron/multicluster-global-hub/pkg/utils" ) const ( @@ -88,7 +90,7 @@ func doMain(ctx context.Context, restConfig *rest.Config, agentConfig *config.Ag go utils.StartDefaultPprofServer() } - mgr, err := createManager(restConfig, agentConfig) + mgr, err := createManager(ctx, restConfig, agentConfig) if err != nil { setupLog.Error(err, "failed to create manager") return 1 @@ -121,35 +123,16 @@ func parseFlags() *config.AgentConfig { pflag.CommandLine.AddGoFlagSet(defaultFlags) pflag.StringVar(&agentConfig.LeafHubName, "leaf-hub-name", "", "The name of the leaf hub.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server", "", - "The bootstrap server for kafka.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "", - "The path of CA certificate for kafka bootstrap server.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "", - "The path of client certificate for kafka bootstrap server.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "", - "The path of client key for kafka bootstrap server.") pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id", "", "Producer Id for the kafka, default is the leaf hub name.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic, "kafka-producer-topic", - "event", "Topic for the kafka producer.") - pflag.IntVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB, - "kafka-message-size-limit", 940, "The limit for kafka message size in KB.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-consumer-topic", - "spec", "Topic for the kafka consumer.") pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID, "kafka-consumer-id", "multicluster-global-hub-agent", "ID for the kafka consumer.") pflag.StringVar(&agentConfig.PodNameSpace, "pod-namespace", constants.GHAgentNamespace, "The agent running namespace, also used as leader election namespace") - pflag.StringVar(&agentConfig.TransportConfig.TransportType, "transport-type", "kafka", - "The transport type, 'kafka'") pflag.IntVar(&agentConfig.SpecWorkPoolSize, "consumer-worker-pool-size", 10, "The goroutine number to propagate the bundles on managed cluster.") pflag.BoolVar(&agentConfig.SpecEnforceHohRbac, "enforce-hoh-rbac", false, "enable hoh RBAC or not, default false") - pflag.StringVar(&agentConfig.TransportConfig.MessageCompressionType, - "transport-message-compression-type", "gzip", - "The message compression type for transport layer, 'gzip' or 'no-op'.") pflag.IntVar(&agentConfig.StatusDeltaCountSwitchFactor, "status-delta-count-switch-factor", 100, "default with 100.") @@ -199,7 +182,7 @@ func completeConfig(agentConfig *config.AgentConfig) error { return nil } -func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) ( +func createManager(ctx context.Context, restConfig *rest.Config, agentConfig *config.AgentConfig) ( ctrl.Manager, error, ) { leaseDuration := time.Duration(agentConfig.ElectionConfig.LeaseDuration) * time.Second @@ -236,27 +219,51 @@ func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) ( if err != nil { return nil, fmt.Errorf("failed to create a new manager: %w", err) } - kubeClient, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to create kubeclient: %w", err) - } - // Need this controller to update the value of clusterclaim hub.open-cluster-management.io - // we use the value to decide whether install the ACM or not - if err := controllers.AddHubClusterClaimController(mgr); err != nil { - return nil, fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err) - } - - if err := controllers.AddCRDController(mgr, restConfig, agentConfig); err != nil { - return nil, fmt.Errorf("failed to add crd controller: %w", err) - } - if err := controllers.AddCertController(mgr, kubeClient); err != nil { - return nil, fmt.Errorf("failed to add crd controller: %w", err) + transportCtrl := controller.NewTransportCtrl( + agentConfig.PodNameSpace, + constants.GHAgentTransportSecret, + agentConfig.TransportConfig.KafkaConfig, + transportCallback(mgr, agentConfig)) + if err = transportCtrl.SetupWithManager(ctx, mgr, []string{ + constants.GHAgentTransportSecret, + commonutils.AgentCertificateSecretName(), + }); err != nil { + return nil, err } + setupLog.Info("add the transport controller to agent") return mgr, nil } +// if the transport consumer and producer is ready then the func will be invoked by the transport controller +func transportCallback(mgr ctrl.Manager, agentConfig *config.AgentConfig, +) controller.TransportCallback { + return func(producer transport.Producer, consumer transport.Consumer) error { + kubeClient, err := kubernetes.NewForConfig(mgr.GetConfig()) + if err != nil { + return fmt.Errorf("failed to create kubeclient: %w", err) + } + + // Need this controller to update the value of clusterclaim hub.open-cluster-management.io + // we use the value to decide whether install the ACM or not + if err := controllers.AddHubClusterClaimController(mgr); err != nil { + return fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err) + } + + if err := controllers.AddCRDController(mgr, mgr.GetConfig(), agentConfig); err != nil { + return fmt.Errorf("failed to add crd controller: %w", err) + } + + if err := controllers.AddCertController(mgr, kubeClient); err != nil { + return fmt.Errorf("failed to add crd controller: %w", err) + } + + setupLog.Info("add the agent controllers to manager") + return nil + } +} + func initCache(config *rest.Config, cacheOpts cache.Options) (cache.Cache, error) { cacheOpts.ByObject = map[client.Object]cache.ByObject{ &apiextensionsv1.CustomResourceDefinition{}: { diff --git a/manager/cmd/manager/main.go b/manager/cmd/manager/main.go index 78014eaac..1ad95c62e 100644 --- a/manager/cmd/manager/main.go +++ b/manager/cmd/manager/main.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" "github.com/stolostron/multicluster-global-hub/manager/pkg/backup" + "github.com/stolostron/multicluster-global-hub/manager/pkg/config" managerconfig "github.com/stolostron/multicluster-global-hub/manager/pkg/config" "github.com/stolostron/multicluster-global-hub/manager/pkg/cronjob" "github.com/stolostron/multicluster-global-hub/manager/pkg/hubmanagement" @@ -39,6 +40,7 @@ import ( commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" "github.com/stolostron/multicluster-global-hub/pkg/statistics" "github.com/stolostron/multicluster-global-hub/pkg/transport" + "github.com/stolostron/multicluster-global-hub/pkg/transport/controller" "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" "github.com/stolostron/multicluster-global-hub/pkg/utils" ) @@ -72,6 +74,7 @@ func parseFlags() *managerconfig.ManagerConfig { DatabaseConfig: &managerconfig.DatabaseConfig{}, TransportConfig: &transport.TransportConfig{ KafkaConfig: &transport.KafkaConfig{ + ConnCredential: &transport.ConnCredential{}, EnableTLS: true, Topics: &transport.ClusterTopic{}, ProducerConfig: &transport.KafkaProducerConfig{}, @@ -111,32 +114,14 @@ func parseFlags() *managerconfig.ManagerConfig { "transport-bridge-database-url", "", "The URL of database server for the transport-bridge user.") pflag.StringVar(&managerConfig.TransportConfig.TransportType, "transport-type", "kafka", "The transport type, 'kafka'.") - pflag.StringVar(&managerConfig.TransportConfig.MessageCompressionType, "transport-message-compression-type", - "gzip", "The message compression type for transport layer, 'gzip' or 'no-op'.") - pflag.DurationVar(&managerConfig.TransportConfig.CommitterInterval, "transport-committer-interval", - 40*time.Second, "The committer interval for transport layer.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server", - "kafka-kafka-bootstrap.kafka.svc:9092", "The bootstrap server for kafka.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClusterIdentity, "kafka-cluster-identity", - "", "The identity for kafka cluster.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "", - "The path of CA certificate for kafka bootstrap server.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "", - "The path of client certificate for kafka bootstrap server.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "", - "The path of client key for kafka bootstrap server.") pflag.StringVar(&managerConfig.DatabaseConfig.CACertPath, "postgres-ca-path", "/postgres-ca/ca.crt", "The path of CA certificate for kafka bootstrap server.") pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id", "multicluster-global-hub-manager", "ID for the kafka producer.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-producer-topic", - "spec", "Topic for the kafka producer.") pflag.IntVar(&managerConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB, "kafka-message-size-limit", 940, "The limit for kafka message size in KB.") pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID, "kafka-consumer-id", "multicluster-global-hub-manager", "ID for the kafka consumer.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.Topics.StatusTopic, - "kafka-consumer-topic", "event", "Topic for the kafka consumer.") pflag.StringVar(&managerConfig.StatisticsConfig.LogInterval, "statistics-log-interval", "1m", "The log interval for statistics.") pflag.StringVar(&managerConfig.NonK8sAPIServerConfig.ClusterAPIURL, "cluster-api-url", @@ -242,51 +227,61 @@ func createManager(ctx context.Context, return nil, fmt.Errorf("failed to create a new manager: %w", err) } - producer, err := producer.NewGenericProducer(managerConfig.TransportConfig, - managerConfig.TransportConfig.KafkaConfig.Topics.SpecTopic) - if err != nil { - return nil, fmt.Errorf("failed to init spec transport bridge: %w", err) - } - // TODO: refactor the manager to start the conflation manager so that it can handle the events from restful API - if !managerConfig.WithACM { - return mgr, nil + transportCtrl := controller.NewTransportCtrl( + managerConfig.ManagerNamespace, + constants.GHManagerTransportSecret, + managerConfig.TransportConfig.KafkaConfig, + transportCallback(ctx, mgr, managerConfig, sqlConn)) + if err = transportCtrl.SetupWithManager(ctx, mgr, []string{constants.GHManagerTransportSecret}); err != nil { + return nil, err } + setupLog.Info("add the transport controller to manager") + return mgr, nil +} - if managerConfig.EnableGlobalResource { - if err := nonk8sapi.AddNonK8sApiServer(mgr, managerConfig.NonK8sAPIServerConfig); err != nil { - return nil, fmt.Errorf("failed to add non-k8s-api-server: %w", err) +// if the transport consumer and producer is ready then the func will be invoked by the transport controller +func transportCallback(ctx context.Context, mgr ctrl.Manager, managerConfig *config.ManagerConfig, + sqlConn *sql.Conn, +) controller.TransportCallback { + return func(producer transport.Producer, consumer transport.Consumer) error { + if !managerConfig.WithACM { + return nil } - } - if managerConfig.EnableGlobalResource { - if err := specsyncer.AddGlobalResourceSpecSyncers(mgr, managerConfig, producer); err != nil { - return nil, fmt.Errorf("failed to add global resource spec syncers: %w", err) + if managerConfig.EnableGlobalResource { + if err := nonk8sapi.AddNonK8sApiServer(mgr, managerConfig.NonK8sAPIServerConfig); err != nil { + return fmt.Errorf("failed to add non-k8s-api-server: %w", err) + } + if err := specsyncer.AddGlobalResourceSpecSyncers(mgr, managerConfig, producer); err != nil { + return fmt.Errorf("failed to add global resource spec syncers: %w", err) + } } - } - if err := statussyncer.AddStatusSyncers(mgr, managerConfig); err != nil { - return nil, fmt.Errorf("failed to add transport-to-db syncers: %w", err) - } + if err := statussyncer.AddStatusSyncers(mgr, consumer, managerConfig); err != nil { + return fmt.Errorf("failed to add transport-to-db syncers: %w", err) + } - // add hub management - if err := hubmanagement.AddHubManagement(mgr, producer); err != nil { - return nil, fmt.Errorf("failed to add hubmanagement to manager - %w", err) - } + // add hub management + if err := hubmanagement.AddHubManagement(mgr, producer); err != nil { + return fmt.Errorf("failed to add hubmanagement to manager - %w", err) + } - // need lock DB for backup - backupPVC := backup.NewBackupPVCReconciler(mgr, sqlConn) - err = backupPVC.SetupWithManager(mgr) - if err != nil { - return nil, err - } + // need lock DB for backup + backupPVC := backup.NewBackupPVCReconciler(mgr, sqlConn) + err := backupPVC.SetupWithManager(mgr) + if err != nil { + return err + } - if err := cronjob.AddSchedulerToManager(ctx, mgr, managerConfig, enableSimulation); err != nil { - return nil, fmt.Errorf("failed to add scheduler to manager: %w", err) - } + if err := cronjob.AddSchedulerToManager(ctx, mgr, managerConfig, enableSimulation); err != nil { + return fmt.Errorf("failed to add scheduler to manager: %w", err) + } - return mgr, nil + setupLog.Info("add the manager controllers to ctrl.Manager") + return nil + } } // function to handle defers with exit, see https://stackoverflow.com/a/27629493/553720. diff --git a/manager/pkg/statussyncer/dispatcher/transport_dispatcher.go b/manager/pkg/statussyncer/dispatcher/transport_dispatcher.go index 19d37bdb0..deb7d11c6 100644 --- a/manager/pkg/statussyncer/dispatcher/transport_dispatcher.go +++ b/manager/pkg/statussyncer/dispatcher/transport_dispatcher.go @@ -11,7 +11,6 @@ import ( "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/conflator" "github.com/stolostron/multicluster-global-hub/pkg/statistics" "github.com/stolostron/multicluster-global-hub/pkg/transport" - genericconsumer "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer" ) // Get message from transport, convert it to bundle and forward it to conflation manager. @@ -22,21 +21,9 @@ type TransportDispatcher struct { statistic *statistics.Statistics } -func AddTransportDispatcher(mgr ctrl.Manager, managerConfig *config.ManagerConfig, +func AddTransportDispatcher(mgr ctrl.Manager, consumer transport.Consumer, managerConfig *config.ManagerConfig, conflationManager *conflator.ConflationManager, stats *statistics.Statistics, ) error { - // start a consumer - topics := managerConfig.TransportConfig.KafkaConfig.Topics - consumer, err := genericconsumer.NewGenericConsumer(managerConfig.TransportConfig, - []string{topics.StatusTopic}, - genericconsumer.EnableDatabaseOffset(true)) - if err != nil { - return fmt.Errorf("failed to initialize transport consumer: %w", err) - } - if err := mgr.Add(consumer); err != nil { - return fmt.Errorf("failed to add transport consumer to manager: %w", err) - } - transportDispatcher := &TransportDispatcher{ log: ctrl.Log.WithName("conflation-dispatcher"), consumer: consumer, diff --git a/manager/pkg/statussyncer/syncers.go b/manager/pkg/statussyncer/syncers.go index fc0d61b56..70a3987e7 100644 --- a/manager/pkg/statussyncer/syncers.go +++ b/manager/pkg/statussyncer/syncers.go @@ -10,11 +10,12 @@ import ( "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/dispatcher" dbsyncer "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/syncers" "github.com/stolostron/multicluster-global-hub/pkg/statistics" + "github.com/stolostron/multicluster-global-hub/pkg/transport" ) // AddStatusSyncers performs the initial setup required before starting the runtime manager. // adds controllers and/or runnables to the manager, registers handler to conflation manager -func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) error { +func AddStatusSyncers(mgr ctrl.Manager, consumer transport.Consumer, managerConfig *config.ManagerConfig) error { // create statistics stats := statistics.NewStatistics(managerConfig.StatisticsConfig) if err := mgr.Add(stats); err != nil { @@ -26,7 +27,7 @@ func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) err registerHandler(conflationManager, managerConfig.EnableGlobalResource) // start consume message from transport to conflation manager - if err := dispatcher.AddTransportDispatcher(mgr, managerConfig, conflationManager, stats); err != nil { + if err := dispatcher.AddTransportDispatcher(mgr, consumer, managerConfig, conflationManager, stats); err != nil { return err } diff --git a/operator/pkg/constants/constants.go b/operator/pkg/constants/constants.go index a9810735c..d80e9c9e7 100644 --- a/operator/pkg/constants/constants.go +++ b/operator/pkg/constants/constants.go @@ -80,7 +80,6 @@ const ( // global hub agent constants const ( GHClusterManagementAddonName = "multicluster-global-hub-controller" - GHManagedClusterAddonName = "multicluster-global-hub-controller" ) // global hub names diff --git a/operator/pkg/controllers/addon/addon_controller.go b/operator/pkg/controllers/addon/addon_controller.go index 20d1b9f4f..b2dad1069 100644 --- a/operator/pkg/controllers/addon/addon_controller.go +++ b/operator/pkg/controllers/addon/addon_controller.go @@ -7,6 +7,7 @@ import ( "github.com/go-logr/logr" operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" + operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" mchv1 "github.com/stolostron/multiclusterhub-operator/api/v1" certificatesv1 "k8s.io/api/certificates/v1" "k8s.io/apimachinery/pkg/runtime" @@ -25,7 +26,6 @@ import ( globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" - operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates" "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/transport" diff --git a/operator/pkg/controllers/addon/addon_controller_manifests.go b/operator/pkg/controllers/addon/addon_controller_manifests.go index 4db16f693..36ce0381b 100644 --- a/operator/pkg/controllers/addon/addon_controller_manifests.go +++ b/operator/pkg/controllers/addon/addon_controller_manifests.go @@ -23,10 +23,10 @@ import ( globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" - "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates" "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" "github.com/stolostron/multicluster-global-hub/pkg/transport" + commonutils "github.com/stolostron/multicluster-global-hub/pkg/utils" ) //go:embed manifests/templates @@ -36,40 +36,41 @@ import ( var FS embed.FS type ManifestsConfig struct { - HoHAgentImage string - ImagePullSecretName string - ImagePullSecretData string - ImagePullPolicy string - LeafHubID string - KafkaBootstrapServer string - TransportType string - KafkaCACert string - KafkaClientCert string - KafkaClientKey string - KafkaClientCertSecret string - KafkaConsumerTopic string - KafkaProducerTopic string - MessageCompressionType string - InstallACMHub bool - Channel string - CurrentCSV string - Source string - SourceNamespace string - InstallHostedMode bool - LeaseDuration string - RenewDeadline string - RetryPeriod string - KlusterletNamespace string - KlusterletWorkSA string - NodeSelector map[string]string - Tolerations []corev1.Toleration - AggregationLevel string - EnableLocalPolicies string - EnableGlobalResource bool - AgentQPS float32 - AgentBurst int - LogLevel string - EnablePprof bool + HoHAgentImage string + ImagePullSecretName string + ImagePullSecretData string + ImagePullPolicy string + LeafHubID string + KafkaBootstrapServer string + TransportType string + TransportSecretName string + KafkaClusterID string + KafkaCACert string + KafkaClientCert string + KafkaClientKey string + KafkaClientCertSecret string + KafkaSpecTopic string + KafkaStatusTopic string + InstallACMHub bool + Channel string + CurrentCSV string + Source string + SourceNamespace string + InstallHostedMode bool + LeaseDuration string + RenewDeadline string + RetryPeriod string + KlusterletNamespace string + KlusterletWorkSA string + NodeSelector map[string]string + Tolerations []corev1.Toleration + AggregationLevel string + EnableLocalPolicies string + EnableGlobalResource bool + AgentQPS float32 + AgentBurst int + LogLevel string + EnablePprof bool // cannot use *corev1.ResourceRequirements, addonfactory.StructToValues removes the real value Resources *Resources } @@ -220,29 +221,30 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, } manifestsConfig := ManifestsConfig{ - HoHAgentImage: image, - ImagePullPolicy: string(imagePullPolicy), - LeafHubID: cluster.Name, - KafkaBootstrapServer: kafkaConnection.BootstrapServer, - KafkaCACert: kafkaConnection.CACert, - KafkaClientCert: kafkaConnection.ClientCert, - KafkaClientKey: kafkaConnection.ClientKey, - KafkaClientCertSecret: certificates.AgentCertificateSecretName(), - KafkaConsumerTopic: clusterTopic.SpecTopic, - KafkaProducerTopic: clusterTopic.StatusTopic, - MessageCompressionType: string(operatorconstants.GzipCompressType), - TransportType: string(transport.Kafka), - LeaseDuration: strconv.Itoa(electionConfig.LeaseDuration), - RenewDeadline: strconv.Itoa(electionConfig.RenewDeadline), - RetryPeriod: strconv.Itoa(electionConfig.RetryPeriod), - KlusterletNamespace: "open-cluster-management-agent", - KlusterletWorkSA: "klusterlet-work-sa", - EnableGlobalResource: a.operatorConfig.GlobalResourceEnabled, - AgentQPS: agentQPS, - AgentBurst: agentBurst, - LogLevel: a.operatorConfig.LogLevel, - EnablePprof: a.operatorConfig.EnablePprof, - Resources: agentRes, + HoHAgentImage: image, + ImagePullPolicy: string(imagePullPolicy), + LeafHubID: cluster.Name, + TransportSecretName: constants.GHAgentTransportSecret, + KafkaClusterID: kafkaConnection.Identity, + KafkaBootstrapServer: kafkaConnection.BootstrapServer, + KafkaCACert: kafkaConnection.CACert, + KafkaClientCert: kafkaConnection.ClientCert, + KafkaClientKey: kafkaConnection.ClientKey, + KafkaClientCertSecret: base64.StdEncoding.EncodeToString([]byte(commonutils.AgentCertificateSecretName())), + KafkaSpecTopic: base64.StdEncoding.EncodeToString([]byte(clusterTopic.SpecTopic)), + KafkaStatusTopic: base64.StdEncoding.EncodeToString([]byte(clusterTopic.StatusTopic)), + TransportType: base64.StdEncoding.EncodeToString([]byte(transport.Kafka)), + LeaseDuration: strconv.Itoa(electionConfig.LeaseDuration), + RenewDeadline: strconv.Itoa(electionConfig.RenewDeadline), + RetryPeriod: strconv.Itoa(electionConfig.RetryPeriod), + KlusterletNamespace: "open-cluster-management-agent", + KlusterletWorkSA: "klusterlet-work-sa", + EnableGlobalResource: a.operatorConfig.GlobalResourceEnabled, + AgentQPS: agentQPS, + AgentBurst: agentBurst, + LogLevel: a.operatorConfig.LogLevel, + EnablePprof: a.operatorConfig.EnablePprof, + Resources: agentRes, } if err := a.setImagePullSecret(mgh, cluster, &manifestsConfig); err != nil { diff --git a/operator/pkg/controllers/addon/addon_installer.go b/operator/pkg/controllers/addon/addon_installer.go index 3a6cf4e6f..dfb4a0dad 100644 --- a/operator/pkg/controllers/addon/addon_installer.go +++ b/operator/pkg/controllers/addon/addon_installer.go @@ -100,7 +100,7 @@ func (r *AddonInstaller) reconclieAddonAndResources(ctx context.Context, cluster existingAddon := &v1alpha1.ManagedClusterAddOn{ ObjectMeta: metav1.ObjectMeta{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: cluster.Name, }, } @@ -157,7 +157,7 @@ func (r *AddonInstaller) removeResourcesAndAddon(ctx context.Context, cluster *c // should remove the addon first, otherwise it mightn't update the mainfiest work for the addon existingAddon := &v1alpha1.ManagedClusterAddOn{ ObjectMeta: metav1.ObjectMeta{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: cluster.Name, }, } @@ -183,7 +183,7 @@ func (r *AddonInstaller) removeResourcesAndAddon(ctx context.Context, cluster *c func expectedManagedClusterAddon(cluster *clusterv1.ManagedCluster) (*v1alpha1.ManagedClusterAddOn, error) { expectedAddon := &v1alpha1.ManagedClusterAddOn{ ObjectMeta: metav1.ObjectMeta{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: cluster.Name, Labels: map[string]string{ constants.GlobalHubOwnerLabelKey: constants.GHOperatorOwnerLabelVal, @@ -241,7 +241,7 @@ func (r *AddonInstaller) SetupWithManager(ctx context.Context, mgr ctrl.Manager) return false }, UpdateFunc: func(e event.UpdateEvent) bool { - if e.ObjectNew.GetName() != operatorconstants.GHManagedClusterAddonName { + if e.ObjectNew.GetName() != constants.GHManagedClusterAddonName { return false } if e.ObjectNew.GetGeneration() == e.ObjectOld.GetGeneration() { @@ -250,16 +250,16 @@ func (r *AddonInstaller) SetupWithManager(ctx context.Context, mgr ctrl.Manager) return true }, DeleteFunc: func(e event.DeleteEvent) bool { - return e.Object.GetName() == operatorconstants.GHManagedClusterAddonName + return e.Object.GetName() == constants.GHManagedClusterAddonName }, } clusterManagementAddonPred := predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { - return e.Object.GetName() == operatorconstants.GHManagedClusterAddonName + return e.Object.GetName() == constants.GHManagedClusterAddonName }, UpdateFunc: func(e event.UpdateEvent) bool { - if e.ObjectNew.GetName() != operatorconstants.GHManagedClusterAddonName { + if e.ObjectNew.GetName() != constants.GHManagedClusterAddonName { return false } if e.ObjectNew.GetGeneration() == e.ObjectOld.GetGeneration() { @@ -268,7 +268,7 @@ func (r *AddonInstaller) SetupWithManager(ctx context.Context, mgr ctrl.Manager) return true }, DeleteFunc: func(e event.DeleteEvent) bool { - return e.Object.GetName() == operatorconstants.GHManagedClusterAddonName + return e.Object.GetName() == constants.GHManagedClusterAddonName }, } diff --git a/operator/pkg/controllers/addon/addon_installer_test.go b/operator/pkg/controllers/addon/addon_installer_test.go index dcb60f6bb..32786648a 100644 --- a/operator/pkg/controllers/addon/addon_installer_test.go +++ b/operator/pkg/controllers/addon/addon_installer_test.go @@ -79,7 +79,7 @@ func fakeMGH(namespace, name string) *operatorv1alpha4.MulticlusterGlobalHub { func fakeHoHAddon(cluster, installNamespace, addonDeployMode string) *v1alpha1.ManagedClusterAddOn { addon := &v1alpha1.ManagedClusterAddOn{ ObjectMeta: metav1.ObjectMeta{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: cluster, }, Spec: v1alpha1.ManagedClusterAddOnSpec{ @@ -270,7 +270,7 @@ func TestAddonInstaller(t *testing.T) { } else { addon := &v1alpha1.ManagedClusterAddOn{} err = r.Get(context.TODO(), types.NamespacedName{ - Namespace: tc.cluster.Name, Name: operatorconstants.GHManagedClusterAddonName, + Namespace: tc.cluster.Name, Name: constants.GHManagedClusterAddonName, }, addon) if err != nil { if errors.IsNotFound(err) { diff --git a/operator/pkg/controllers/addon/certificates/csr.go b/operator/pkg/controllers/addon/certificates/csr.go index f891e3daa..02858ab9f 100644 --- a/operator/pkg/controllers/addon/certificates/csr.go +++ b/operator/pkg/controllers/addon/certificates/csr.go @@ -5,22 +5,17 @@ package certificates import ( - "fmt" - "strings" - addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" clusterv1 "open-cluster-management.io/api/cluster/v1" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" - operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" + "github.com/stolostron/multicluster-global-hub/pkg/constants" ) -const SignerName = "open-cluster-management.io/globalhub-signer" - // default: https://github.com/open-cluster-management-io/addon-framework/blob/main/pkg/agent/inteface.go#L213 func SignerAndCsrConfigurations(cluster *clusterv1.ManagedCluster) []addonapiv1alpha1.RegistrationConfig { globalHubRegistrationConfig := addonapiv1alpha1.RegistrationConfig{ - SignerName: SignerName, + SignerName: constants.GHAddonCertSignerName, Subject: addonapiv1alpha1.Subject{ User: config.GetKafkaUserName(cluster.Name), // Groups: getGroups(cluster.Name, addonName), @@ -29,9 +24,3 @@ func SignerAndCsrConfigurations(cluster *clusterv1.ManagedCluster) []addonapiv1a registrationConfigs := []addonapiv1alpha1.RegistrationConfig{globalHubRegistrationConfig} return registrationConfigs } - -// https://github.com/open-cluster-management-io/ocm/blob/main/pkg/registration/spoke/addon/configuration.go -func AgentCertificateSecretName() string { - return fmt.Sprintf("%s-%s-client-cert", operatorconstants.GHManagedClusterAddonName, - strings.ReplaceAll(SignerName, "/", "-")) -} diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-deployment.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-deployment.yaml index 6bf9f9dcc..0d0b9e360 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-deployment.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-deployment.yaml @@ -38,16 +38,7 @@ spec: - --zap-log-level={{.LogLevel}} - --pod-namespace=$(POD_NAMESPACE) - --leaf-hub-name={{ .LeafHubID }} - - --kafka-consumer-id={{ .LeafHubID }} - --enforce-hoh-rbac=false - - --transport-type={{ .TransportType }} - - --kafka-bootstrap-server={{ .KafkaBootstrapServer }} - - --kafka-ca-cert-path=/kafka-cluster-ca/ca.crt - - --kafka-client-cert-path=/kafka-client-certs/tls.crt - - --kafka-client-key-path=/kafka-client-certs/tls.key - - --kafka-consumer-topic={{.KafkaConsumerTopic}} - - --kafka-producer-topic={{.KafkaProducerTopic}} - - --transport-message-compression-type={{.MessageCompressionType}} - --lease-duration={{.LeaseDuration}} - --renew-deadline={{.RenewDeadline}} - --retry-period={{.RetryPeriod}} @@ -66,13 +57,6 @@ spec: fieldRef: apiVersion: v1 fieldPath: metadata.namespace - volumeMounts: - - mountPath: /kafka-cluster-ca - name: kafka-cluster-ca - readOnly: true - - mountPath: /kafka-client-certs - name: kafka-client-certs - readOnly: true {{- if .ImagePullSecretName }} imagePullSecrets: - name: {{ .ImagePullSecretName }} @@ -91,11 +75,4 @@ spec: tolerationSeconds: {{.TolerationSeconds}} {{- end}} {{- end}} - volumes: - - name: kafka-cluster-ca - secret: - secretName: kafka-cluster-ca-cert - - name: kafka-client-certs - secret: - secretName: {{.KafkaClientCertSecret}} {{ end }} diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml index a223796eb..9c220d43c 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml @@ -2,13 +2,19 @@ apiVersion: v1 kind: Secret metadata: - name: kafka-certs-secret + name: {{.TransportSecretName}} namespace: {{ .AddonInstallNamespace }} labels: addon.open-cluster-management.io/hosted-manifest-location: none type: Opaque data: + "id": "{{.KafkaClusterID}}" "ca.crt": "{{.KafkaCACert}}" "client.crt": "{{.KafkaClientCert}}" "client.key": "{{.KafkaClientKey}}" + "bootstrap_server": "{{.KafkaBootstrapServer}}" + "type": "{{.TransportType}}" + "status_topic": "{{.KafkaStatusTopic}}" + "spec_topic": "{{.KafkaSpecTopic}}" + "client_secret": "{{.KafkaClientCertSecret}}" {{- end -}} diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml deleted file mode 100644 index a6b997e76..000000000 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml +++ /dev/null @@ -1,12 +0,0 @@ -{{- if not .InstallHostedMode -}} -apiVersion: v1 -kind: Secret -metadata: - name: kafka-cluster-ca-cert - namespace: {{ .AddonInstallNamespace }} - labels: - addon.open-cluster-management.io/hosted-manifest-location: none -type: Opaque -data: - "ca.crt": "{{.KafkaCACert}}" -{{- end -}} \ No newline at end of file diff --git a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go index 73810b1dd..e194d3064 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go @@ -9,11 +9,15 @@ import ( "strconv" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" @@ -23,6 +27,7 @@ import ( "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" "github.com/stolostron/multicluster-global-hub/pkg/transport" + "github.com/stolostron/multicluster-global-hub/pkg/transport/controller" commonutils "github.com/stolostron/multicluster-global-hub/pkg/utils" ) @@ -36,6 +41,7 @@ var ( type ManagerReconciler struct { ctrl.Manager + runtimeClient client.Client kubeClient kubernetes.Interface operatorConfig *config.OperatorConfig } @@ -44,6 +50,7 @@ func NewManagerReconciler(mgr ctrl.Manager, kubeClient kubernetes.Interface, con ) *ManagerReconciler { return &ManagerReconciler{ Manager: mgr, + runtimeClient: mgr.GetClient(), kubeClient: kubeClient, operatorConfig: conf, } @@ -94,17 +101,25 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, return fmt.Errorf("the transport connection(%s) must not be empty", transportConn) } + if updateTransportConn(transportConn) { + err = r.ensureTransportSecret(ctx, mgh.Namespace, transportConn) + if err != nil { + return fmt.Errorf("failed to create/update manager secret: %w", err) + } + } + storageConn := config.GetStorageConnection() if storageConn == nil || !config.GetDatabaseReady() { return fmt.Errorf("the storage connection or database isn't ready") } - if isMiddlewareUpdated(transportConn, storageConn) { + if updateStorageConn(storageConn) { err = commonutils.RestartPod(ctx, r.kubeClient, mgh.Namespace, constants.ManagerDeploymentName) if err != nil { return fmt.Errorf("failed to restart manager pod: %w", err) } } + electionConfig, err := config.GetElectionConfig() if err != nil { return fmt.Errorf("failed to get the electionConfig %w", err) @@ -121,16 +136,8 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, DatabaseURL: base64.StdEncoding.EncodeToString( []byte(storageConn.SuperuserDatabaseURI)), PostgresCACert: base64.StdEncoding.EncodeToString(storageConn.CACert), - KafkaClusterIdentity: transportConn.Identity, - KafkaCACert: transportConn.CACert, - KafkaClientCert: transportConn.ClientCert, - KafkaClientKey: transportConn.ClientKey, - KafkaBootstrapServer: transportConn.BootstrapServer, - KafkaConsumerTopic: config.ManagerStatusTopic(), - KafkaProducerTopic: config.GetSpecTopic(), Namespace: mgh.Namespace, MessageCompressionType: string(operatorconstants.GzipCompressType), - TransportType: string(transport.Kafka), LeaseDuration: strconv.Itoa(electionConfig.LeaseDuration), RenewDeadline: strconv.Itoa(electionConfig.RenewDeadline), RetryPeriod: strconv.Itoa(electionConfig.RetryPeriod), @@ -157,31 +164,65 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, return nil } -func isMiddlewareUpdated(transportConn *transport.ConnCredential, storageConn *config.PostgresConnection) bool { - updated := false - if transportConnectionCache == nil || storageConnectionCache == nil { - updated = true - } - if !reflect.DeepEqual(transportConn, transportConnectionCache) { - updated = true - } - if !reflect.DeepEqual(storageConn, storageConnectionCache) { - updated = true +func updateTransportConn(conn *transport.ConnCredential) bool { + if conn == nil { + return false } - if updated { - setMiddlewareCache(transportConn, storageConn) + if transportConnectionCache == nil || !reflect.DeepEqual(conn, transportConnectionCache) { + transportConnectionCache = conn + return true } - return updated + return false } -func setMiddlewareCache(transportConn *transport.ConnCredential, storageConn *config.PostgresConnection) { - if transportConn != nil { - transportConnectionCache = transportConn +func updateStorageConn(conn *config.PostgresConnection) bool { + if conn == nil { + return false } + if storageConnectionCache == nil || !reflect.DeepEqual(conn, storageConnectionCache) { + storageConnectionCache = conn + return true + } + return false +} - if storageConn != nil { - storageConnectionCache = storageConn +func (r *ManagerReconciler) ensureTransportSecret(ctx context.Context, namespace string, + conn *transport.ConnCredential, +) error { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: constants.GHManagerTransportSecret, + Labels: map[string]string{ + constants.GlobalHubOwnerLabelKey: constants.GHOperatorOwnerLabelVal, + }, + }, + } + controller.LoadDataToSecret(secret, conn, &transport.ClusterTopic{ + StatusTopic: config.FuzzyStatusTopic(), + SpecTopic: config.GetSpecTopic(), + }, "") + + // Try to get the existing secret + existingSecret := &corev1.Secret{} + err := r.runtimeClient.Get(ctx, client.ObjectKeyFromObject(secret), existingSecret) + if err != nil && errors.IsNotFound(err) { + klog.Infof("create the manager secret: %s", secret.Name) + if err := r.runtimeClient.Create(ctx, secret); err != nil { + return err + } + } else if err != nil { + return err + } else { + if !reflect.DeepEqual(existingSecret.Data, secret.Data) { + existingSecret.Data = secret.Data + klog.Infof("update the manager secret: %s", secret.Name) + if err := r.runtimeClient.Update(ctx, existingSecret); err != nil { + return err + } + } } + return nil } type ManagerVariables struct { @@ -193,15 +234,7 @@ type ManagerVariables struct { ProxySessionSecret string DatabaseURL string PostgresCACert string - KafkaClusterIdentity string - KafkaCACert string - KafkaConsumerTopic string - KafkaProducerTopic string - KafkaClientCert string - KafkaClientKey string - KafkaBootstrapServer string MessageCompressionType string - TransportType string Namespace string LeaseDuration string RenewDeadline string diff --git a/operator/pkg/controllers/hubofhubs/manager/manifests/deployment.yaml b/operator/pkg/controllers/hubofhubs/manager/manifests/deployment.yaml index 8c3db3365..bdca9fb7d 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manifests/deployment.yaml +++ b/operator/pkg/controllers/hubofhubs/manager/manifests/deployment.yaml @@ -37,16 +37,7 @@ spec: - --zap-log-level={{.LogLevel}} - --manager-namespace=$(POD_NAMESPACE) - --watch-namespace=$(WATCH_NAMESPACE) - - --transport-type={{.TransportType}} - - --kafka-bootstrap-server={{.KafkaBootstrapServer}} - - --kafka-cluster-identity={{.KafkaClusterIdentity}} - - --kafka-consumer-topic={{.KafkaConsumerTopic}} - - --kafka-producer-topic={{.KafkaProducerTopic}} - - --kafka-ca-cert-path=/kafka-certs/ca.crt - - --kafka-client-cert-path=/kafka-certs/client.crt - - --kafka-client-key-path=/kafka-certs/client.key - --postgres-ca-path=/postgres-credential/ca.crt - - --transport-message-compression-type={{.MessageCompressionType}} - --process-database-url=$(DATABASE_URL) - --transport-bridge-database-url=$(DATABASE_URL) - --lease-duration={{.LeaseDuration}} @@ -95,9 +86,6 @@ spec: name: webhook-certs readOnly: true {{- end }} - - mountPath: /kafka-certs - name: kafka-certs - readOnly: true - mountPath: /postgres-credential name: postgres-credential readOnly: true @@ -166,9 +154,6 @@ spec: {{- end}} {{- end}} volumes: - - name: kafka-certs - secret: - secretName: kafka-certs-secret - name: postgres-credential secret: secretName: postgres-credential-secret diff --git a/operator/pkg/controllers/hubofhubs/manager/manifests/kafka-certs-secret.yaml b/operator/pkg/controllers/hubofhubs/manager/manifests/kafka-certs-secret.yaml deleted file mode 100644 index fe6cb40c6..000000000 --- a/operator/pkg/controllers/hubofhubs/manager/manifests/kafka-certs-secret.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: v1 -kind: Secret -metadata: - name: kafka-certs-secret - namespace: {{.Namespace}} - labels: - name: multicluster-global-hub-manager -type: Opaque -data: - "ca.crt": "{{.KafkaCACert}}" - "client.crt": "{{.KafkaClientCert}}" - "client.key": "{{.KafkaClientKey}}" \ No newline at end of file diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go index 899daf0df..ae3c15c45 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go @@ -63,8 +63,8 @@ func (s *BYOTransporter) GetConnCredential(username string) (*transport.ConnCred return nil, err } return &transport.ConnCredential{ - Identity: string(kafkaSecret.Data[filepath.Join("bootstrap_server")]), - BootstrapServer: string(kafkaSecret.Data[filepath.Join("bootstrap_server")]), + Identity: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("bootstrap_server")]), + BootstrapServer: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("bootstrap_server")]), CACert: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("ca.crt")]), ClientCert: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("client.crt")]), ClientKey: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("client.key")]), diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go index 763823950..adcc56e9a 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go @@ -467,8 +467,8 @@ func (k *strimziTransporter) getConnCredentailByCluster() (*transport.ConnCreden clusterIdentity = *kafkaCluster.Status.ClusterId } credential := &transport.ConnCredential{ - Identity: clusterIdentity, - BootstrapServer: *kafkaCluster.Status.Listeners[1].BootstrapServers, + Identity: base64.StdEncoding.EncodeToString([]byte(clusterIdentity)), + BootstrapServer: base64.StdEncoding.EncodeToString([]byte(*kafkaCluster.Status.Listeners[1].BootstrapServers)), CACert: base64.StdEncoding.EncodeToString([]byte(kafkaCluster.Status.Listeners[1].Certificates[0])), } return credential, nil diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 660b290aa..af9cd5283 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -63,12 +63,24 @@ const ( PostgresCAConfigMap = "multicluster-global-hub-postgres-ca" ) +// the global hub transport secret for manager and agent +const ( + GHManagerTransportSecret = "multicluster-global-hub-manager" // #nosec G101 + GHAgentTransportSecret = "multicluster-global-hub-agent" // #nosec G101 +) + // global hub console secret/configmap names const ( CustomAlertName = "multicluster-global-hub-custom-alerting" CustomGrafanaIniName = "multicluster-global-hub-custom-grafana-config" ) +// global hub addon +const ( + GHManagedClusterAddonName = "multicluster-global-hub-controller" + GHAddonCertSignerName = "open-cluster-management.io/globalhub-signer" +) + const ( // identify the resource is managed by GlobalHubOwnerLabelKey = "global-hub.open-cluster-management.io/managed-by" diff --git a/pkg/transport/config/config_test.go b/pkg/transport/config/config_test.go index c8ccca323..a9fa62709 100644 --- a/pkg/transport/config/config_test.go +++ b/pkg/transport/config/config_test.go @@ -1,8 +1,6 @@ package config import ( - "errors" - "os" "strings" "testing" @@ -20,19 +18,26 @@ func TestConfluentConfig(t *testing.T) { { desc: "kafka config with tls", kafkaConfig: &transport.KafkaConfig{ - BootstrapServer: "localhost:9092", - EnableTLS: true, - CaCertPath: "/tmp/ca.crt", - ClientCertPath: "/tmp/client.crt", - ClientKeyPath: "/tmp/client.key", + ConnCredential: &transport.ConnCredential{ + BootstrapServer: "localhost:9092", + CACert: "", + ClientCert: "clientca", + ClientKey: "clientkey", + }, + EnableTLS: true, }, - expectedErr: errors.New("failed to append ca certificate"), + expectedErr: nil, }, { desc: "kafka config without tls", kafkaConfig: &transport.KafkaConfig{ - BootstrapServer: "localhost:9092", - EnableTLS: false, + ConnCredential: &transport.ConnCredential{ + BootstrapServer: "localhost:9092", + CACert: "", + ClientCert: "clientca", + ClientKey: "clientkey", + }, + EnableTLS: false, }, expectedErr: nil, }, @@ -40,15 +45,6 @@ func TestConfluentConfig(t *testing.T) { for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - if tc.kafkaConfig.CaCertPath != "" { - assert.Nil(t, os.WriteFile(tc.kafkaConfig.CaCertPath, []byte("cadata"), 0o644)) - } - if tc.kafkaConfig.ClientCertPath != "" { - assert.Nil(t, os.WriteFile(tc.kafkaConfig.ClientCertPath, []byte("certdata"), 0o644)) - } - if tc.kafkaConfig.ClientKeyPath != "" { - assert.Nil(t, os.WriteFile(tc.kafkaConfig.ClientKeyPath, []byte("keydata"), 0o644)) - } _, err := GetConfluentConfigMap(tc.kafkaConfig, true) if tc.expectedErr != nil { assert.Equal(t, err.Error(), tc.expectedErr.Error()) @@ -61,10 +57,13 @@ func TestConfluentConfig(t *testing.T) { func TestGetSaramaConfig(t *testing.T) { kafkaConfig := &transport.KafkaConfig{ - EnableTLS: false, - ClientCertPath: "/tmp/client.crt", - ClientKeyPath: "/tmp/client.key", - CaCertPath: "/tmp/ca.crt", + ConnCredential: &transport.ConnCredential{ + BootstrapServer: "localhost:9092", + CACert: "", + ClientCert: "clientca", + ClientKey: "clientkey", + }, + EnableTLS: false, } _, err := GetSaramaConfig(kafkaConfig) if err != nil { @@ -72,9 +71,6 @@ func TestGetSaramaConfig(t *testing.T) { } kafkaConfig.EnableTLS = true - if er := os.WriteFile(kafkaConfig.CaCertPath, []byte("test"), 0o644); er != nil { // #nosec G304 - t.Errorf("failed to write cert file - %v", er) - } _, err = GetSaramaConfig(kafkaConfig) if err != nil && !strings.Contains(err.Error(), "failed to find any PEM data in certificate") { t.Errorf("failed to get sarama config - %v", err) diff --git a/pkg/transport/config/confluent_config.go b/pkg/transport/config/confluent_config.go index 820f931ee..263b3fa28 100644 --- a/pkg/transport/config/confluent_config.go +++ b/pkg/transport/config/confluent_config.go @@ -25,7 +25,7 @@ func GetBasicConfigMap() *kafkav2.ConfigMap { func SetProducerConfig(kafkaConfigMap *kafkav2.ConfigMap) { _ = kafkaConfigMap.SetKey("go.produce.channel.size", 1000) _ = kafkaConfigMap.SetKey("acks", "1") - _ = kafkaConfigMap.SetKey("retries", "0") + _ = kafkaConfigMap.SetKey("retries", "3") _ = kafkaConfigMap.SetKey("go.events.channel.size", 1000) } @@ -76,10 +76,24 @@ func SetTLSByLocation(kafkaConfigMap *kafkav2.ConfigMap, caCertPath, certPath, k return nil } +func SetTLSByRawData(kafkaConfigMap *kafkav2.ConfigMap, caCrt, clientCrt, clientKey string) error { + _ = kafkaConfigMap.SetKey("security.protocol", "ssl") + if err := kafkaConfigMap.SetKey("ssl.ca.pem", caCrt); err != nil { + return err + } + if err := kafkaConfigMap.SetKey("ssl.certificate.pem", clientCrt); err != nil { + return err + } + if err := kafkaConfigMap.SetKey("ssl.key.pem", clientKey); err != nil { + return err + } + return nil +} + // https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig, producer bool) (*kafkav2.ConfigMap, error) { kafkaConfigMap := GetBasicConfigMap() - _ = kafkaConfigMap.SetKey("bootstrap.servers", kafkaConfig.BootstrapServer) + _ = kafkaConfigMap.SetKey("bootstrap.servers", kafkaConfig.ConnCredential.BootstrapServer) if producer { SetProducerConfig(kafkaConfigMap) } else { @@ -88,7 +102,11 @@ func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig, producer bool) (* if !kafkaConfig.EnableTLS { return kafkaConfigMap, nil } - err := SetTLSByLocation(kafkaConfigMap, kafkaConfig.CaCertPath, kafkaConfig.ClientCertPath, kafkaConfig.ClientKeyPath) + + err := SetTLSByRawData(kafkaConfigMap, + kafkaConfig.ConnCredential.CACert, + kafkaConfig.ConnCredential.ClientCert, + kafkaConfig.ConnCredential.ClientKey) if err != nil { return nil, err } diff --git a/pkg/transport/config/saram_config.go b/pkg/transport/config/saram_config.go index fd559cdd2..32fd61483 100644 --- a/pkg/transport/config/saram_config.go +++ b/pkg/transport/config/saram_config.go @@ -4,7 +4,6 @@ import ( "crypto/tls" "crypto/x509" "os" - "path/filepath" "github.com/Shopify/sarama" @@ -16,12 +15,13 @@ func GetSaramaConfig(kafkaConfig *transport.KafkaConfig) (*sarama.Config, error) saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 - _, validCa := utils.Validate(kafkaConfig.CaCertPath) - if kafkaConfig.EnableTLS && validCa { + if kafkaConfig.EnableTLS { var err error saramaConfig.Net.TLS.Enable = true - saramaConfig.Net.TLS.Config, err = NewTLSConfig(kafkaConfig.ClientCertPath, kafkaConfig.ClientKeyPath, - kafkaConfig.CaCertPath) + saramaConfig.Net.TLS.Config, err = NewTLSConfig( + kafkaConfig.ConnCredential.CACert, + kafkaConfig.ConnCredential.ClientCert, + kafkaConfig.ConnCredential.ClientKey) if err != nil { return nil, err } @@ -29,14 +29,24 @@ func GetSaramaConfig(kafkaConfig *transport.KafkaConfig) (*sarama.Config, error) return saramaConfig, nil } -func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) { +func NewTLSConfig(caCert, clientCert, clientKey string) (*tls.Config, error) { // #nosec G402 tlsConfig := tls.Config{} // Load client cert + clientCertFile := "/tmp/kafka_client.crt" + clientKeyFile := "/tmp/kafka_client.key" + if err := os.WriteFile(clientCertFile, []byte(clientCert), 0o644); err != nil { + return nil, err + } + if err := os.WriteFile(clientKeyFile, []byte(clientKey), 0o644); err != nil { + return nil, err + } + _, validCert := utils.Validate(clientCertFile) _, validKey := utils.Validate(clientKeyFile) if validCert && validKey { + cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile) if err != nil { return &tlsConfig, err @@ -48,14 +58,10 @@ func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config } // Load CA cert - caCert, err := os.ReadFile(filepath.Clean(caCertFile)) - if err != nil { - return &tlsConfig, err - } caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) + caCertPool.AppendCertsFromPEM([]byte(caCert)) tlsConfig.RootCAs = caCertPool tlsConfig.BuildNameToCertificate() - return &tlsConfig, err + return &tlsConfig, nil } diff --git a/pkg/transport/consumer/generic_consumer.go b/pkg/transport/consumer/generic_consumer.go index dad9de0bb..014caa311 100644 --- a/pkg/transport/consumer/generic_consumer.go +++ b/pkg/transport/consumer/generic_consumer.go @@ -26,13 +26,18 @@ import ( var transportID string type GenericConsumer struct { - log logr.Logger - client cloudevents.Client - assembler *messageAssembler - eventChan chan *cloudevents.Event + log logr.Logger + + assembler *messageAssembler + eventChan chan *cloudevents.Event + consumerOpts []GenericConsumeOption + consumeTopics []string clusterIdentity string enableDatabaseOffset bool + consumerCtx context.Context + consumerCancel context.CancelFunc + client cloudevents.Client } type GenericConsumeOption func(*GenericConsumer) error @@ -47,55 +52,15 @@ func EnableDatabaseOffset(enableOffset bool) GenericConsumeOption { func NewGenericConsumer(tranConfig *transport.TransportConfig, topics []string, opts ...GenericConsumeOption, ) (*GenericConsumer, error) { - log := ctrl.Log.WithName(fmt.Sprintf("%s-consumer", tranConfig.TransportType)) - var receiver interface{} - var err error - var clusterIdentity string - switch tranConfig.TransportType { - case string(transport.Kafka): - log.Info("transport consumer with cloudevents-kafka receiver") - receiver, err = getConfluentReceiverProtocol(tranConfig, topics) - if err != nil { - return nil, err - } - clusterIdentity = tranConfig.KafkaConfig.ClusterIdentity - case string(transport.Chan): - log.Info("transport consumer with go chan receiver") - if tranConfig.Extends == nil { - tranConfig.Extends = make(map[string]interface{}) - } - topic := "event" - if topics != nil && len(topics) > 0 { - topic = topics[0] - } - if _, found := tranConfig.Extends[topic]; !found { - tranConfig.Extends[topic] = gochan.New() - } - receiver = tranConfig.Extends[topic] - clusterIdentity = "kafka-cluster-chan" - default: - return nil, fmt.Errorf("transport-type - %s is not a valid option", tranConfig.TransportType) - } - - client, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1)) - if err != nil { - return nil, err - } - c := &GenericConsumer{ - log: log, - client: client, - clusterIdentity: clusterIdentity, + log: ctrl.Log.WithName(fmt.Sprintf("%s-consumer", tranConfig.TransportType)), + consumerOpts: opts, + consumeTopics: topics, eventChan: make(chan *cloudevents.Event), assembler: newMessageAssembler(), enableDatabaseOffset: false, - consumeTopics: topics, } - if err := c.applyOptions(opts...); err != nil { - return nil, err - } - transportID = clusterIdentity - return c, nil + return c, c.initClient(tranConfig, topics) } func (c *GenericConsumer) applyOptions(opts ...GenericConsumeOption) error { @@ -148,6 +113,67 @@ func (c *GenericConsumer) EventChan() chan *cloudevents.Event { return c.eventChan } +func (c *GenericConsumer) Reconnect(ctx context.Context, tranConfig *transport.TransportConfig, topics []string) error { + err := c.initClient(tranConfig, topics) + if err != nil { + return err + } + + // close the previous consumer + if c.consumerCancel != nil { + c.consumerCancel() + } + c.consumerCtx, c.consumerCancel = context.WithCancel(ctx) + go func() { + if err := c.Start(c.consumerCtx); err != nil { + c.log.Error(err, "failed to reconnect(start) the consumer") + } + }() + return nil +} + +// initClient will init the consumer identity, clientProtocol, client +func (c *GenericConsumer) initClient(tranConfig *transport.TransportConfig, topics []string) error { + var err error + var clientProtocol interface{} + switch tranConfig.TransportType { + case string(transport.Kafka): + c.log.Info("transport consumer with cloudevents-kafka receiver") + c.clusterIdentity = tranConfig.KafkaConfig.ConnCredential.Identity + clientProtocol, err = getConfluentReceiverProtocol(tranConfig, topics) + if err != nil { + return err + } + case string(transport.Chan): + c.log.Info("transport consumer with go chan receiver") + if tranConfig.Extends == nil { + tranConfig.Extends = make(map[string]interface{}) + } + topic := "event" + if topics != nil && len(topics) > 0 { + topic = topics[0] + } + if _, found := tranConfig.Extends[topic]; !found { + tranConfig.Extends[topic] = gochan.New() + } + clientProtocol = tranConfig.Extends[topic] + c.clusterIdentity = "kafka-cluster-chan" + default: + return fmt.Errorf("transport-type - %s is not a valid option", tranConfig.TransportType) + } + + c.client, err = cloudevents.NewClient(clientProtocol, client.WithPollGoroutines(1)) + if err != nil { + return err + } + + // override by the options + if err := c.applyOptions(c.consumerOpts...); err != nil { + return err + } + return nil +} + func getInitOffset(kafkaClusterIdentity string) ([]kafka.TopicPartition, error) { db := database.GetGorm() var positions []models.Transport diff --git a/pkg/transport/consumer/generic_consumer_test.go b/pkg/transport/consumer/generic_consumer_test.go index 8e5805da9..a7368f2af 100644 --- a/pkg/transport/consumer/generic_consumer_test.go +++ b/pkg/transport/consumer/generic_consumer_test.go @@ -24,8 +24,10 @@ func TestGenerateConsumer(t *testing.T) { transportConfig := &transport.TransportConfig{ TransportType: "kafka", KafkaConfig: &transport.KafkaConfig{ - BootstrapServer: mockKafkaCluster.BootstrapServers(), - EnableTLS: false, + ConnCredential: &transport.ConnCredential{ + BootstrapServer: mockKafkaCluster.BootstrapServers(), + }, + EnableTLS: false, ConsumerConfig: &transport.KafkaConsumerConfig{ ConsumerID: "test-consumer", }, diff --git a/pkg/transport/consumer/sarama_consumer.go b/pkg/transport/consumer/sarama_consumer.go index 7daa4801f..0667f8325 100644 --- a/pkg/transport/consumer/sarama_consumer.go +++ b/pkg/transport/consumer/sarama_consumer.go @@ -38,7 +38,8 @@ func NewSaramaConsumer(ctx context.Context, kafkaConfig *transport.KafkaConfig, saramaConfig.Consumer.Offsets.AutoCommit.Enable = true saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest - client, err := sarama.NewConsumerGroup([]string{kafkaConfig.BootstrapServer}, kafkaConfig.ConsumerConfig.ConsumerID, + client, err := sarama.NewConsumerGroup([]string{kafkaConfig.ConnCredential.BootstrapServer}, + kafkaConfig.ConsumerConfig.ConsumerID, saramaConfig) if err != nil { return nil, err diff --git a/pkg/transport/consumer/sarama_consumer_test.go b/pkg/transport/consumer/sarama_consumer_test.go index 3d67517d9..d0a6e2558 100644 --- a/pkg/transport/consumer/sarama_consumer_test.go +++ b/pkg/transport/consumer/sarama_consumer_test.go @@ -25,8 +25,10 @@ func TestConsumerGroup(t *testing.T) { defer kafkaCluster.Close() kafkaConfig := &transport.KafkaConfig{ - BootstrapServer: kafkaCluster.Addr(), - EnableTLS: false, + ConnCredential: &transport.ConnCredential{ + BootstrapServer: kafkaCluster.Addr(), + }, + EnableTLS: false, ConsumerConfig: &transport.KafkaConsumerConfig{ ConsumerID: "test-consumer", }, @@ -52,7 +54,7 @@ func TestConsumerGroup(t *testing.T) { saramaConfig.Consumer.Offsets.AutoCommit.Enable = true saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest - group, err := sarama.NewConsumerGroup([]string{kafkaConfig.BootstrapServer}, "my-group", saramaConfig) + group, err := sarama.NewConsumerGroup([]string{kafkaConfig.ConnCredential.BootstrapServer}, "my-group", saramaConfig) if err != nil { t.Fatal(err) } diff --git a/pkg/transport/controller/controller.go b/pkg/transport/controller/controller.go new file mode 100644 index 000000000..9ace21b91 --- /dev/null +++ b/pkg/transport/controller/controller.go @@ -0,0 +1,214 @@ +package controller + +import ( + "context" + "fmt" + "reflect" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/stolostron/multicluster-global-hub/pkg/constants" + "github.com/stolostron/multicluster-global-hub/pkg/transport" + "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer" + "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" + "github.com/stolostron/multicluster-global-hub/pkg/utils" +) + +type TransportCallback func(producer transport.Producer, consumer transport.Consumer) error + +type TransportCtrl struct { + secretNamespace string + secretName string + kafkaConfig *transport.KafkaConfig + // the use the producer and consumer to activate the call back funciton, once it executed successful, then clear it. + callback TransportCallback + + runtimeClient client.Client + consumer transport.Consumer + producer transport.Producer +} + +func NewTransportCtrl(secretNamespace, secretName string, kafkaConfig *transport.KafkaConfig, + callback TransportCallback, +) *TransportCtrl { + return &TransportCtrl{ + secretNamespace: secretNamespace, + secretName: secretName, + kafkaConfig: kafkaConfig, + callback: callback, + } +} + +func (c *TransportCtrl) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: c.secretNamespace, + Name: c.secretName, + }, + } + if err := c.runtimeClient.Get(ctx, client.ObjectKeyFromObject(secret), secret); err != nil { + return ctrl.Result{}, err + } + + // it the client certs generated on the managed hub clusters + var clientSecret *corev1.Secret + clientSecretName := string(secret.Data["client_secret"]) + if clientSecretName != "" { + clientSecretName = utils.AgentCertificateSecretName() + } + clientSecret = &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: c.secretNamespace, + Name: clientSecretName, + }, + } + err := c.runtimeClient.Get(ctx, client.ObjectKeyFromObject(clientSecret), clientSecret) + if err != nil && !errors.IsNotFound(err) { + return ctrl.Result{}, err + } + + if !c.updateKafkaConfig(secret, clientSecret) { + return ctrl.Result{}, nil + } + + // create the new consumer/producer and close the previous consumer/producer + sendTopic := c.kafkaConfig.Topics.SpecTopic + receiveTopic := c.kafkaConfig.Topics.StatusTopic + databaseOffset := true + if c.secretName != constants.GHManagerTransportSecret { + sendTopic = c.kafkaConfig.Topics.StatusTopic + receiveTopic = c.kafkaConfig.Topics.SpecTopic + databaseOffset = false + } + transportConfig := &transport.TransportConfig{ + KafkaConfig: c.kafkaConfig, + TransportType: string(transport.Kafka), + } + + if c.producer != nil { + if err := c.producer.Reconnect(transportConfig); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to reconnect the producer: %w", err) + } + } else { + sender, err := producer.NewGenericProducer(transportConfig, sendTopic) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create/update the producer: %w", err) + } + c.producer = sender + } + + if c.consumer != nil { + if err := c.consumer.Reconnect(ctx, transportConfig, []string{receiveTopic}); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to reconnect the consumer: %w", err) + } + } else { + receiver, err := consumer.NewGenericConsumer(transportConfig, []string{receiveTopic}, + consumer.EnableDatabaseOffset(databaseOffset)) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create the consumer: %w", err) + } + c.consumer = receiver + go func() { + if err = c.consumer.Start(ctx); err != nil { + klog.Errorf("failed to start the consumser: %v", err) + } + }() + } + klog.Info("the transport secret(producer, consumer) is created/updated") + + if c.callback == nil { + if err := c.callback(c.producer, c.consumer); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to invoke the callback function: %w", err) + } + c.callback = nil + } + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (c *TransportCtrl) SetupWithManager(ctx context.Context, mgr ctrl.Manager, secretNames []string) error { + c.runtimeClient = mgr.GetClient() + cond := func(obj client.Object) bool { + for _, name := range secretNames { + if obj.GetName() == name { + return true + } + } + return false + } + secretPred := predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return cond(e.Object) + }, + UpdateFunc: func(e event.UpdateEvent) bool { + if !cond(e.ObjectNew) { + return false + } + newSecret := e.ObjectNew.(*corev1.Secret) + oldSecret := e.ObjectOld.(*corev1.Secret) + // only enqueue the obj when secret data changed + return !reflect.DeepEqual(newSecret.Data, oldSecret.Data) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + } + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Secret{}, builder.WithPredicates(secretPred)). + Complete(c) +} + +// updateKafkaConfig update the current config by secret, and validate the configuration +func (c *TransportCtrl) updateKafkaConfig(secret, clientSecret *corev1.Secret) bool { + update := false + expectedConn := &transport.ConnCredential{ + Identity: string(secret.Data["id"]), + BootstrapServer: string(secret.Data["bootstrap_server"]), + CACert: string(secret.Data["ca.crt"]), + ClientCert: string(secret.Data["client.crt"]), + ClientKey: string(secret.Data["client.key"]), + } + // type := string(secret.Data["type"]) + if clientSecret != nil && clientSecret.Data != nil { + expectedConn.ClientCert = string(clientSecret.Data["tls.crt"]) + expectedConn.ClientKey = string(clientSecret.Data["tls.key"]) + } + expectedTopics := &transport.ClusterTopic{ + SpecTopic: string(secret.Data["spec_topic"]), + StatusTopic: string(secret.Data["status_topic"]), + } + if !reflect.DeepEqual(expectedConn, c.kafkaConfig.ConnCredential) { + c.kafkaConfig.ConnCredential = expectedConn + update = true + } + if !reflect.DeepEqual(expectedTopics, c.kafkaConfig.Topics) { + c.kafkaConfig.Topics = expectedTopics + update = true + } + return update +} + +func LoadDataToSecret(secret *corev1.Secret, conn *transport.ConnCredential, topics *transport.ClusterTopic, + clientSecret string, +) { + secret.Data = map[string][]byte{ + "id": []byte(conn.Identity), + "bootstrap_server": []byte(conn.BootstrapServer), + "ca.crt": []byte(conn.CACert), + "client.crt": []byte(conn.ClientCert), + "client.key": []byte(conn.ClientKey), + "type": []byte(transport.Kafka), + "status_topic": []byte(topics.StatusTopic), + "spec_topic": []byte(topics.SpecTopic), + "client_secret": []byte(clientSecret), // first get the client cert/key from this secret + } +} diff --git a/pkg/transport/producer/generic_producer.go b/pkg/transport/producer/generic_producer.go index 9f7537d2a..4a43aa353 100644 --- a/pkg/transport/producer/generic_producer.go +++ b/pkg/transport/producer/generic_producer.go @@ -10,6 +10,7 @@ import ( kafka_confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/protocol" "github.com/cloudevents/sdk-go/v2/protocol/gochan" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/go-logr/logr" @@ -26,54 +27,20 @@ const ( type GenericProducer struct { log logr.Logger + defaultTopic string + clientPotocol interface{} client cloudevents.Client messageSizeLimit int } func NewGenericProducer(transportConfig *transport.TransportConfig, defaultTopic string) (*GenericProducer, error) { - var sender interface{} - var err error - messageSize := DefaultMessageKBSize * 1000 - log := ctrl.Log.WithName(fmt.Sprintf("%s-producer", transportConfig.TransportType)) - - switch transportConfig.TransportType { - case string(transport.Kafka): - if transportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB > 0 { - messageSize = transportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB * 1000 - } - kafkaProtocol, err := getConfluentSenderProtocol(transportConfig, defaultTopic) - if err != nil { - return nil, err - } - - eventChan, err := kafkaProtocol.Events() - if err != nil { - return nil, err - } - handleProducerEvents(log, eventChan) - sender = kafkaProtocol - case string(transport.Chan): // this go chan protocol is only use for test - if transportConfig.Extends == nil { - transportConfig.Extends = make(map[string]interface{}) - } - if _, found := transportConfig.Extends[defaultTopic]; !found { - transportConfig.Extends[defaultTopic] = gochan.New() - } - sender = transportConfig.Extends[defaultTopic] - default: - return nil, fmt.Errorf("transport-type - %s is not a valid option", transportConfig.TransportType) - } - - client, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) - if err != nil { - return nil, err + genericProducer := &GenericProducer{ + log: ctrl.Log.WithName(fmt.Sprintf("%s-producer", transportConfig.TransportType)), + messageSizeLimit: DefaultMessageKBSize * 1000, + defaultTopic: defaultTopic, } - - return &GenericProducer{ - log: log, - client: client, - messageSizeLimit: messageSize, - }, nil + err := genericProducer.initClient(transportConfig) + return genericProducer, err } func (p *GenericProducer) SendEvent(ctx context.Context, evt cloudevents.Event) error { @@ -108,6 +75,55 @@ func (p *GenericProducer) SendEvent(ctx context.Context, evt cloudevents.Event) return nil } +// Reconnect close the previous producer state and init a new producer +func (p *GenericProducer) Reconnect(config *transport.TransportConfig) error { + closer, ok := p.clientPotocol.(protocol.Closer) + if ok { + if err := closer.Close(context.Background()); err != nil { + return fmt.Errorf("failed to close the previous producer: %w", err) + } + } + return p.initClient(config) +} + +// initClient will init/update the client, clientProtocol and messageLimitSize based on the transportConfig +func (p *GenericProducer) initClient(transportConfig *transport.TransportConfig) error { + switch transportConfig.TransportType { + case string(transport.Kafka): + if transportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB > 0 { + p.messageSizeLimit = transportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB * 1000 + } + kafkaProtocol, err := getConfluentSenderProtocol(transportConfig, p.defaultTopic) + if err != nil { + return err + } + + eventChan, err := kafkaProtocol.Events() + if err != nil { + return err + } + handleProducerEvents(p.log, eventChan) + p.clientPotocol = kafkaProtocol + case string(transport.Chan): // this go chan protocol is only use for test + if transportConfig.Extends == nil { + transportConfig.Extends = make(map[string]interface{}) + } + if _, found := transportConfig.Extends[p.defaultTopic]; !found { + transportConfig.Extends[p.defaultTopic] = gochan.New() + } + p.clientPotocol = transportConfig.Extends[p.defaultTopic] + default: + return fmt.Errorf("transport-type - %s is not a valid option", transportConfig.TransportType) + } + + client, err := cloudevents.NewClient(p.clientPotocol, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + return err + } + p.client = client + return nil +} + func (p *GenericProducer) splitPayloadIntoChunks(payload []byte) [][]byte { var chunk []byte chunks := make([][]byte, 0, len(payload)/(p.messageSizeLimit)+1) @@ -133,7 +149,7 @@ func getSaramaSenderProtocol(transportConfig *transport.TransportConfig, default // set max message bytes to 1 MB: 1000 000 > config.ProducerConfig.MessageSizeLimitKB * 1000 saramaConfig.Producer.MaxMessageBytes = MaxMessageKBLimit * 1000 saramaConfig.Producer.Return.Successes = true - sender, err := kafka_sarama.NewSender([]string{transportConfig.KafkaConfig.BootstrapServer}, + sender, err := kafka_sarama.NewSender([]string{transportConfig.KafkaConfig.ConnCredential.BootstrapServer}, saramaConfig, defaultTopic) if err != nil { return nil, err diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index 021f7dd3f..b0f8b198d 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -11,12 +11,14 @@ import ( type Producer interface { SendEvent(ctx context.Context, evt cloudevents.Event) error + Reconnect(config *TransportConfig) error } type Consumer interface { // start the transport to consume message Start(ctx context.Context) error EventChan() chan *cloudevents.Event + Reconnect(ctx context.Context, config *TransportConfig, topics []string) error } // init the transport with different implementation/protocol: secret, strimzi operator or plain deployment @@ -27,7 +29,6 @@ type Transporter interface { EnsureTopic(clusterName string) (*ClusterTopic, error) // Cleanup will delete the user or topic for the cluster Prune(clusterName string) error - // get the connection credential by user GetConnCredential(clusterName string) (*ConnCredential, error) } diff --git a/pkg/transport/type.go b/pkg/transport/type.go index 358bf3f62..652c9a3bc 100644 --- a/pkg/transport/type.go +++ b/pkg/transport/type.go @@ -41,24 +41,18 @@ const ( ) type TransportConfig struct { - TransportType string - MessageCompressionType string - CommitterInterval time.Duration - KafkaConfig *KafkaConfig - Extends map[string]interface{} + TransportType string + KafkaConfig *KafkaConfig + Extends map[string]interface{} } // Kafka Config type KafkaConfig struct { - ClusterIdentity string - BootstrapServer string - CaCertPath string - ClientCertPath string - ClientKeyPath string - EnableTLS bool - Topics *ClusterTopic - ProducerConfig *KafkaProducerConfig - ConsumerConfig *KafkaConsumerConfig + ConnCredential *ConnCredential + EnableTLS bool + Topics *ClusterTopic + ProducerConfig *KafkaProducerConfig + ConsumerConfig *KafkaConsumerConfig } type KafkaProducerConfig struct { @@ -67,7 +61,8 @@ type KafkaProducerConfig struct { } type KafkaConsumerConfig struct { - ConsumerID string + ConsumerID string + CommitterInterval time.Duration } // topics diff --git a/pkg/utils/object.go b/pkg/utils/object.go index 38d3d418c..ae737bab5 100644 --- a/pkg/utils/object.go +++ b/pkg/utils/object.go @@ -13,6 +13,8 @@ import ( clusterv1 "open-cluster-management.io/api/cluster/v1" policyv1 "open-cluster-management.io/governance-policy-propagator/api/v1" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/stolostron/multicluster-global-hub/pkg/constants" ) const ( @@ -107,3 +109,9 @@ func PrettyPrint(obj interface{}) { fmt.Println(string(payload)) } } + +// https://github.com/open-cluster-management-io/ocm/blob/main/pkg/registration/spoke/addon/configuration.go +func AgentCertificateSecretName() string { + return fmt.Sprintf("%s-%s-client-cert", constants.GHManagedClusterAddonName, + strings.ReplaceAll(constants.GHAddonCertSignerName, "/", "-")) +} diff --git a/samples/config/confluent_config.go b/samples/config/confluent_config.go index 21c922fb8..b499dd3da 100644 --- a/samples/config/confluent_config.go +++ b/samples/config/confluent_config.go @@ -9,9 +9,9 @@ import ( kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2" "github.com/confluentinc/confluent-kafka-go/v2/kafka" operatorconfig "github.com/stolostron/multicluster-global-hub/operator/pkg/config" - "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates" "github.com/stolostron/multicluster-global-hub/pkg/transport" "github.com/stolostron/multicluster-global-hub/pkg/transport/config" + "github.com/stolostron/multicluster-global-hub/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" @@ -30,35 +30,14 @@ func GetConfluentConfigMapBySecret(isProducer bool) (*kafka.ConfigMap, error) { log.Fatalf("failed to get transport secret: %v", err) return nil, err } - bootStrapServer := string(secret.Data["bootstrap_server"]) - - caCrtPath := "/tmp/ca.crt" - err = os.WriteFile(caCrtPath, secret.Data["ca.crt"], 0o600) - if err != nil { - log.Fatalf("failed to write ca.crt: %v", err) - return nil, err - } - - clientCrtPath := "/tmp/client.crt" - err = os.WriteFile(clientCrtPath, secret.Data["client.crt"], 0o600) - if err != nil { - log.Fatalf("failed to write client.crt: %v", err) - return nil, err - } - - clientKeyPath := "/tmp/client.key" - err = os.WriteFile(clientKeyPath, secret.Data["client.key"], 0o600) - if err != nil { - log.Fatalf("failed to write client.key: %v", err) - return nil, err - } - kafkaConfig := &transport.KafkaConfig{ - BootstrapServer: bootStrapServer, - EnableTLS: true, - CaCertPath: caCrtPath, - ClientCertPath: clientCrtPath, - ClientKeyPath: clientKeyPath, + ConnCredential: &transport.ConnCredential{ + BootstrapServer: string(secret.Data["bootstrap_server"]), + CACert: string(secret.Data["ca.crt"]), + ClientCert: string(secret.Data["client.crt"]), + ClientKey: string(secret.Data["client.key"]), + }, + EnableTLS: true, } configMap, err := config.GetConfluentConfigMap(kafkaConfig, isProducer) if err != nil { @@ -99,7 +78,7 @@ func GetConfluentConfigMapFromManagedHub(producer bool) (*kafka.ConfigMap, error clientCertSecret := &corev1.Secret{} err = c.Get(context.TODO(), types.NamespacedName{ - Name: certificates.AgentCertificateSecretName(), + Name: utils.AgentCertificateSecretName(), Namespace: namespace, }, clientCertSecret) if err != nil { @@ -107,20 +86,6 @@ func GetConfluentConfigMapFromManagedHub(producer bool) (*kafka.ConfigMap, error } fmt.Println(">> client secret:", clientCertSecret.Name) - clientCrtPath := "/tmp/client.crt" - err = os.WriteFile(clientCrtPath, clientCertSecret.Data["tls.crt"], 0o600) - if err != nil { - log.Fatalf("failed to write client.crt: %v", err) - return nil, err - } - - clientKeyPath := "/tmp/client.key" - err = os.WriteFile(clientKeyPath, clientCertSecret.Data["tls.key"], 0o600) - if err != nil { - log.Fatalf("failed to write client.key: %v", err) - return nil, err - } - caCertSecret := &corev1.Secret{} err = c.Get(context.TODO(), types.NamespacedName{ Name: "kafka-cluster-ca-cert", @@ -132,20 +97,15 @@ func GetConfluentConfigMapFromManagedHub(producer bool) (*kafka.ConfigMap, error fmt.Println(">> cluster ca secret:", caCertSecret.Name) - caCrtPath := "/tmp/ca.crt" - err = os.WriteFile(caCrtPath, caCertSecret.Data["ca.crt"], 0o600) - if err != nil { - log.Fatalf("failed to write ca.crt: %v", err) - return nil, err - } - consumerGroupId := "test-group-id-managed-hub" kafkaConfig := &transport.KafkaConfig{ - BootstrapServer: bootstrapSever, - EnableTLS: true, - CaCertPath: caCrtPath, - ClientCertPath: clientCrtPath, - ClientKeyPath: clientKeyPath, + EnableTLS: true, + ConnCredential: &transport.ConnCredential{ + BootstrapServer: string(caCertSecret.Data["bootstrap_server"]), + CACert: string(caCertSecret.Data["ca.crt"]), + ClientCert: string(caCertSecret.Data["client.crt"]), + ClientKey: string(caCertSecret.Data["client.key"]), + }, ConsumerConfig: &transport.KafkaConsumerConfig{ ConsumerID: consumerGroupId, }, diff --git a/test/integration/agent/status/suite_test.go b/test/integration/agent/status/suite_test.go index 8be1d6e68..5b1f2811b 100644 --- a/test/integration/agent/status/suite_test.go +++ b/test/integration/agent/status/suite_test.go @@ -76,13 +76,15 @@ var _ = BeforeSuite(func() { agentConfig := &config.AgentConfig{ LeafHubName: leafHubName, TransportConfig: &transport.TransportConfig{ - CommitterInterval: 1 * time.Second, - TransportType: string(transport.Chan), + TransportType: string(transport.Chan), KafkaConfig: &transport.KafkaConfig{ Topics: &transport.ClusterTopic{ StatusTopic: "event", SpecTopic: "spec", }, + ConsumerConfig: &transport.KafkaConsumerConfig{ + CommitterInterval: 1 * time.Second, + }, }, }, EnableGlobalResource: true, diff --git a/test/integration/manager/controller/hub_management_test.go b/test/integration/manager/controller/hub_management_test.go index fb2d5a006..ead395c9e 100644 --- a/test/integration/manager/controller/hub_management_test.go +++ b/test/integration/manager/controller/hub_management_test.go @@ -13,6 +13,7 @@ import ( "github.com/stolostron/multicluster-global-hub/manager/pkg/hubmanagement" "github.com/stolostron/multicluster-global-hub/pkg/database" "github.com/stolostron/multicluster-global-hub/pkg/database/models" + "github.com/stolostron/multicluster-global-hub/pkg/transport" ) var _ = Describe("hub management", func() { @@ -102,3 +103,7 @@ type tmpProducer struct{} func (p *tmpProducer) SendEvent(ctx context.Context, evt cloudevents.Event) error { return nil } + +func (p *tmpProducer) Reconnect(config *transport.TransportConfig) error { + return nil +} diff --git a/test/integration/manager/spec/suite_test.go b/test/integration/manager/spec/suite_test.go index 05bd181c5..3c7c56475 100644 --- a/test/integration/manager/spec/suite_test.go +++ b/test/integration/manager/spec/suite_test.go @@ -106,8 +106,7 @@ var _ = BeforeSuite(func() { DeletedLabelsTrimmingInterval: 2 * time.Second, }, TransportConfig: &transport.TransportConfig{ - TransportType: string(transport.Chan), - CommitterInterval: 10 * time.Second, + TransportType: string(transport.Chan), }, StatisticsConfig: &statistics.StatisticsConfig{}, NonK8sAPIServerConfig: &nonk8sapi.NonK8sAPIServerConfig{}, diff --git a/test/integration/manager/status/suite_test.go b/test/integration/manager/status/suite_test.go index 93fd34e1c..953449c32 100644 --- a/test/integration/manager/status/suite_test.go +++ b/test/integration/manager/status/suite_test.go @@ -22,6 +22,7 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/database" "github.com/stolostron/multicluster-global-hub/pkg/statistics" "github.com/stolostron/multicluster-global-hub/pkg/transport" + "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer" genericproducer "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" "github.com/stolostron/multicluster-global-hub/test/integration/utils/testpostgres" ) @@ -100,8 +101,16 @@ var _ = BeforeSuite(func() { producer, err = genericproducer.NewGenericProducer(managerConfig.TransportConfig, "event") Expect(err).NotTo(HaveOccurred()) + consumer, err := consumer.NewGenericConsumer(managerConfig.TransportConfig, []string{"event"}, + consumer.EnableDatabaseOffset(false)) + Expect(err).NotTo(HaveOccurred()) + + // use manager to start the consumer + err = mgr.Add(consumer) + Expect(err).NotTo(HaveOccurred()) + By("Add controllers to manager") - err = statussyncer.AddStatusSyncers(mgr, managerConfig) + err = statussyncer.AddStatusSyncers(mgr, consumer, managerConfig) Expect(err).ToNot(HaveOccurred()) By("Start the manager") diff --git a/test/integration/operator/addon/addon_deploy_test.go b/test/integration/operator/addon/addon_deploy_test.go index ddd73f6d0..76d17fabc 100644 --- a/test/integration/operator/addon/addon_deploy_test.go +++ b/test/integration/operator/addon/addon_deploy_test.go @@ -74,7 +74,7 @@ var _ = Describe("addon deploy", func() { It("Should create HoH agent when an OCP without deployMode label is imported", func() { clusterName := fmt.Sprintf("hub-%s", rand.String(6)) workName := fmt.Sprintf("addon-%s-deploy-0", - operatorconstants.GHManagedClusterAddonName) + constants.GHManagedClusterAddonName) By("By preparing an OCP Managed Clusters") prepareCluster(clusterName, @@ -87,7 +87,7 @@ var _ = Describe("addon deploy", func() { addon := &addonv1alpha1.ManagedClusterAddOn{} Eventually(func() error { return runtimeClient.Get(ctx, types.NamespacedName{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: clusterName, }, addon) }, timeout, interval).ShouldNot(HaveOccurred()) @@ -103,13 +103,13 @@ var _ = Describe("addon deploy", func() { }, work) }, timeout, interval).ShouldNot(HaveOccurred()) - Expect(len(work.Spec.Workload.Manifests)).Should(Equal(9)) + Expect(len(work.Spec.Workload.Manifests)).Should(Equal(8)) }) It("Should create HoH agent and ACM when an OCP is imported", func() { clusterName := fmt.Sprintf("hub-%s", rand.String(6)) workName := fmt.Sprintf("addon-%s-deploy-0", - operatorconstants.GHManagedClusterAddonName) + constants.GHManagedClusterAddonName) By("By preparing an OCP Managed Clusters") prepareCluster(clusterName, @@ -130,7 +130,7 @@ var _ = Describe("addon deploy", func() { addon := &addonv1alpha1.ManagedClusterAddOn{} Eventually(func() error { return runtimeClient.Get(ctx, types.NamespacedName{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: clusterName, }, addon) }, timeout, interval).ShouldNot(HaveOccurred()) @@ -147,14 +147,14 @@ var _ = Describe("addon deploy", func() { }, timeout, interval).ShouldNot(HaveOccurred()) // contains both the ACM and the Global Hub manifests - Expect(len(work.Spec.Workload.Manifests)).Should(Equal(18)) + Expect(len(work.Spec.Workload.Manifests)).Should(Equal(17)) }) It("Should create HoH addon when an OCP with deploy mode = default is imported in hosted mode", func() { clusterName := fmt.Sprintf("hub-%s", rand.String(6)) hostingClusterName := fmt.Sprintf("hub-hosting-%s", rand.String(6)) workName := fmt.Sprintf("addon-%s-deploy-0", - operatorconstants.GHManagedClusterAddonName) + constants.GHManagedClusterAddonName) By("By preparing clusters") prepareCluster(clusterName, @@ -176,7 +176,7 @@ var _ = Describe("addon deploy", func() { addon := &addonv1alpha1.ManagedClusterAddOn{} Eventually(func() error { return runtimeClient.Get(ctx, types.NamespacedName{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: clusterName, }, addon) }, timeout, interval).ShouldNot(HaveOccurred()) @@ -192,16 +192,16 @@ var _ = Describe("addon deploy", func() { }, work) }, timeout, interval).ShouldNot(HaveOccurred()) - Expect(len(work.Spec.Workload.Manifests)).Should(Equal(9)) + Expect(len(work.Spec.Workload.Manifests)).Should(Equal(8)) }) It("Should create HoH addon when an OCP with deploy mode = Hosted is imported in hosted mode", func() { clusterName := fmt.Sprintf("hub-%s", rand.String(6)) hostingClusterName := fmt.Sprintf("hub-hosting-%s", rand.String(6)) workName := fmt.Sprintf("addon-%s-deploy-0", - operatorconstants.GHManagedClusterAddonName) + constants.GHManagedClusterAddonName) hostingWorkName := fmt.Sprintf("addon-%s-deploy-hosting-%s-0", - operatorconstants.GHManagedClusterAddonName, clusterName) + constants.GHManagedClusterAddonName, clusterName) By("By preparing clusters") prepareCluster(clusterName, map[string]string{ @@ -227,7 +227,7 @@ var _ = Describe("addon deploy", func() { addon := &addonv1alpha1.ManagedClusterAddOn{} Eventually(func() error { return runtimeClient.Get(ctx, types.NamespacedName{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: clusterName, }, addon) }, timeout, interval).ShouldNot(HaveOccurred()) @@ -259,9 +259,9 @@ var _ = Describe("addon deploy", func() { clusterName := fmt.Sprintf("hub-%s", rand.String(6)) hostingClusterName := fmt.Sprintf("hub-hosting-%s", rand.String(6)) workName := fmt.Sprintf("addon-%s-deploy-0", - operatorconstants.GHManagedClusterAddonName) + constants.GHManagedClusterAddonName) hostingWorkName := fmt.Sprintf("addon-%s-deploy-hosting-%s-0", - operatorconstants.GHManagedClusterAddonName, clusterName) + constants.GHManagedClusterAddonName, clusterName) By("By preparing clusters") prepareCluster(clusterName, map[string]string{ @@ -293,7 +293,7 @@ var _ = Describe("addon deploy", func() { addon := &addonv1alpha1.ManagedClusterAddOn{} Eventually(func() error { return runtimeClient.Get(ctx, types.NamespacedName{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: clusterName, }, addon) }, timeout, interval).ShouldNot(HaveOccurred()) diff --git a/test/integration/operator/addon/addon_registry_test.go b/test/integration/operator/addon/addon_registry_test.go index c30a6eb50..924ec01b9 100644 --- a/test/integration/operator/addon/addon_registry_test.go +++ b/test/integration/operator/addon/addon_registry_test.go @@ -17,7 +17,7 @@ import ( clusterv1 "open-cluster-management.io/api/cluster/v1" workv1 "open-cluster-management.io/api/work/v1" - operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" + "github.com/stolostron/multicluster-global-hub/pkg/constants" ) // go test ./test/integration/operator/addon -ginkgo.focus "addon registry" -v @@ -26,7 +26,7 @@ var _ = Describe("addon registry", Ordered, func() { }) It("Should update the image pull secret from the mgh cr", func() { clusterName := fmt.Sprintf("hub-%s", rand.String(6)) - workName := fmt.Sprintf("addon-%s-deploy-0", operatorconstants.GHManagedClusterAddonName) + workName := fmt.Sprintf("addon-%s-deploy-0", constants.GHManagedClusterAddonName) By("By preparing an OCP Managed Clusters") prepareCluster(clusterName, @@ -39,7 +39,7 @@ var _ = Describe("addon registry", Ordered, func() { addon := &addonv1alpha1.ManagedClusterAddOn{} Eventually(func() error { return runtimeClient.Get(ctx, types.NamespacedName{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: clusterName, }, addon) }, timeout, interval).ShouldNot(HaveOccurred()) @@ -77,7 +77,7 @@ var _ = Describe("addon registry", Ordered, func() { It("Should update the image registry and pull secret from ManagedClusterImageRegistry", func() { clusterName := fmt.Sprintf("hub-%s", rand.String(6)) workName := fmt.Sprintf("addon-%s-deploy-0", - operatorconstants.GHManagedClusterAddonName) + constants.GHManagedClusterAddonName) By("By preparing the image registry pull secret") imageRegistrySecret := &corev1.Secret{ @@ -105,7 +105,7 @@ var _ = Describe("addon registry", Ordered, func() { addon := &addonv1alpha1.ManagedClusterAddOn{} Eventually(func() error { return runtimeClient.Get(ctx, types.NamespacedName{ - Name: operatorconstants.GHManagedClusterAddonName, + Name: constants.GHManagedClusterAddonName, Namespace: clusterName, }, addon) }, timeout, interval).ShouldNot(HaveOccurred())