diff --git a/agent/cmd/agent/main.go b/agent/cmd/agent/main.go index 974279125..415be45d2 100644 --- a/agent/cmd/agent/main.go +++ b/agent/cmd/agent/main.go @@ -13,7 +13,6 @@ import ( corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" clusterv1 "open-cluster-management.io/api/cluster/v1" @@ -34,7 +33,7 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/jobs" commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" "github.com/stolostron/multicluster-global-hub/pkg/transport" - "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" + "github.com/stolostron/multicluster-global-hub/pkg/transport/controller" "github.com/stolostron/multicluster-global-hub/pkg/utils" ) @@ -106,11 +105,11 @@ func parseFlags() *config.AgentConfig { agentConfig := &config.AgentConfig{ ElectionConfig: &commonobjects.LeaderElectionConfig{}, TransportConfig: &transport.TransportConfig{ - KafkaConfig: &transport.KafkaConfig{ - Topics: &transport.ClusterTopic{}, - ProducerConfig: &transport.KafkaProducerConfig{}, - ConsumerConfig: &transport.KafkaConsumerConfig{}, - }, + // IsManager specifies the send/receive topics from specTopic and statusTopic + // For example, SpecTopic sends and statusTopic receives on the manager; the agent is the opposite + IsManager: false, + // EnableDatabaseOffset affects only the manager, deciding if consumption starts from a database-stored offset + EnableDatabaseOffset: false, }, } @@ -121,24 +120,6 @@ func parseFlags() *config.AgentConfig { pflag.CommandLine.AddGoFlagSet(defaultFlags) pflag.StringVar(&agentConfig.LeafHubName, "leaf-hub-name", "", "The name of the leaf hub.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server", "", - "The bootstrap server for kafka.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "", - "The path of CA certificate for kafka bootstrap server.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "", - "The path of client certificate for kafka bootstrap server.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "", - "The path of client key for kafka bootstrap server.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id", "", - "Producer Id for the kafka, default is the leaf hub name.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic, "kafka-producer-topic", - "event", "Topic for the kafka producer.") - pflag.IntVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB, - "kafka-message-size-limit", 940, "The limit for kafka message size in KB.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-consumer-topic", - "spec", "Topic for the kafka consumer.") - pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID, "kafka-consumer-id", - "multicluster-global-hub-agent", "ID for the kafka consumer.") pflag.StringVar(&agentConfig.PodNameSpace, "pod-namespace", constants.GHAgentNamespace, "The agent running namespace, also used as leader election namespace") pflag.StringVar(&agentConfig.TransportConfig.TransportType, "transport-type", "kafka", @@ -147,9 +128,6 @@ func parseFlags() *config.AgentConfig { "The goroutine number to propagate the bundles on managed cluster.") pflag.BoolVar(&agentConfig.SpecEnforceHohRbac, "enforce-hoh-rbac", false, "enable hoh RBAC or not, default false") - pflag.StringVar(&agentConfig.TransportConfig.MessageCompressionType, - "transport-message-compression-type", "gzip", - "The message compression type for transport layer, 'gzip' or 'no-op'.") pflag.IntVar(&agentConfig.StatusDeltaCountSwitchFactor, "status-delta-count-switch-factor", 100, "default with 100.") @@ -180,19 +158,11 @@ func completeConfig(agentConfig *config.AgentConfig) error { if agentConfig.LeafHubName == "" { return fmt.Errorf("flag managed-hub-name can't be empty") } - if agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID == "" { - agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID = agentConfig.LeafHubName - } + agentConfig.TransportConfig.ConsumerGroupId = agentConfig.LeafHubName if agentConfig.SpecWorkPoolSize < 1 || agentConfig.SpecWorkPoolSize > 100 { return fmt.Errorf("flag consumer-worker-pool-size should be in the scope [1, 100]") } - - if agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB > producer.MaxMessageKBLimit { - return fmt.Errorf("flag kafka-message-size-limit %d must not exceed %d", - agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB, producer.MaxMessageKBLimit) - } - agentConfig.TransportConfig.KafkaConfig.EnableTLS = true if agentConfig.MetricsAddress == "" { agentConfig.MetricsAddress = fmt.Sprintf("%s:%d", metricsHost, metricsPort) } @@ -236,25 +206,38 @@ func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) ( if err != nil { return nil, fmt.Errorf("failed to create a new manager: %w", err) } - kubeClient, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to create kubeclient: %w", err) - } // Need this controller to update the value of clusterclaim hub.open-cluster-management.io - // we use the value to decide whether install the ACM or not - if err := controllers.AddHubClusterClaimController(mgr); err != nil { - return nil, fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err) - } - if err := controllers.AddCRDController(mgr, restConfig, agentConfig); err != nil { - return nil, fmt.Errorf("failed to add crd controller: %w", err) + err = controller.NewTransportCtrl( + agentConfig.PodNameSpace, + constants.GHTransportConfigSecret, + transportCallback(mgr, agentConfig), + agentConfig.TransportConfig, + ).SetupWithManager(mgr) + if err != nil { + return nil, err } + setupLog.Info("add the transport controller to agent") + return mgr, nil +} - if err := controllers.AddCertController(mgr, kubeClient); err != nil { - return nil, fmt.Errorf("failed to add crd controller: %w", err) - } +// if the transport consumer and producer is ready then the func will be invoked by the transport controller +func transportCallback(mgr ctrl.Manager, agentConfig *config.AgentConfig, +) controller.TransportCallback { + return func(producer transport.Producer, consumer transport.Consumer) error { + // Need this controller to update the value of clusterclaim hub.open-cluster-management.io + // we use the value to decide whether install the ACM or not + if err := controllers.AddHubClusterClaimController(mgr); err != nil { + return fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err) + } - return mgr, nil + if err := controllers.AddCRDController(mgr, mgr.GetConfig(), agentConfig, producer, consumer); err != nil { + return fmt.Errorf("failed to add crd controller: %w", err) + } + + setupLog.Info("add the agent controllers to manager") + return nil + } } func initCache(config *rest.Config, cacheOpts cache.Options) (cache.Cache, error) { diff --git a/agent/pkg/config/lease.go b/agent/pkg/config/lease.go index c831cc8fc..a3969ead5 100644 --- a/agent/pkg/config/lease.go +++ b/agent/pkg/config/lease.go @@ -19,6 +19,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +var leaseCtrlStarted bool + const ( leaseUpdateJitterFactor = 0.25 defaultLeaseDurationSeconds = 60 @@ -36,6 +38,10 @@ type leaseUpdater struct { // AddHoHLeaseUpdater creates a new LeaseUpdater instance aand add it to given manager func AddHoHLeaseUpdater(mgr ctrl.Manager, addonNamespace, addonName string) error { + if leaseCtrlStarted { + return nil + } + var config *rest.Config if isAgentTesting, ok := os.LookupEnv("AGENT_TESTING"); ok && isAgentTesting == "true" { config = mgr.GetConfig() @@ -59,7 +65,7 @@ func AddHoHLeaseUpdater(mgr ctrl.Manager, addonNamespace, addonName string) erro return err } - return mgr.Add(&leaseUpdater{ + err = mgr.Add(&leaseUpdater{ log: ctrl.Log.WithName("multicluster-global-hub-lease-updater"), client: c, leaseName: addonName, @@ -72,6 +78,11 @@ func AddHoHLeaseUpdater(mgr ctrl.Manager, addonNamespace, addonName string) erro "name=multicluster-global-hub-agent"), }, }) + if err != nil { + return err + } + leaseCtrlStarted = true + return nil } // Start starts a goroutine to update lease to implement controller-runtime Runnable interface diff --git a/agent/pkg/controllers/cert_controller.go b/agent/pkg/controllers/cert_controller.go deleted file mode 100644 index c8cfcc6b5..000000000 --- a/agent/pkg/controllers/cert_controller.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright (c) 2024 Red Hat, Inc. -// Copyright Contributors to the Open Cluster Management project - -package controllers - -import ( - "context" - "reflect" - - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" - - "github.com/stolostron/multicluster-global-hub/pkg/constants" - "github.com/stolostron/multicluster-global-hub/pkg/utils" -) - -// certController is used to watch if the kafka cert(constants.KafkaCertSecretName) changed, -// if changed, restart agent pod -type certController struct { - kubeClient kubernetes.Interface - log logr.Logger -} - -// Restart the agent pod when secret data changed -func (c *certController) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { - reqLogger := c.log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) - reqLogger.V(2).Info("cert controller", "NamespacedName:", request.NamespacedName) - err := utils.RestartPod(ctx, c.kubeClient, constants.GHAgentNamespace, constants.AgentDeploymentName) - if err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{}, nil -} - -func AddCertController(mgr ctrl.Manager, kubeClient kubernetes.Interface) error { - return ctrl.NewControllerManagedBy(mgr).Named("cert-controller"). - For(&corev1.Secret{}, builder.WithPredicates(predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - return false - }, - UpdateFunc: func(e event.UpdateEvent) bool { - if e.ObjectNew.GetName() != constants.KafkaCertSecretName { - return false - } - newSecret := e.ObjectNew.(*corev1.Secret) - oldSecret := e.ObjectOld.(*corev1.Secret) - // only enqueue the obj when secret data changed - return !reflect.DeepEqual(newSecret.Data, oldSecret.Data) - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return true - }, - })). - Complete(&certController{ - kubeClient: kubeClient, - log: ctrl.Log.WithName("cert-controller"), - }) -} diff --git a/agent/pkg/controllers/crd_controller.go b/agent/pkg/controllers/crd_controller.go index fe078d91e..2b9cf2473 100644 --- a/agent/pkg/controllers/crd_controller.go +++ b/agent/pkg/controllers/crd_controller.go @@ -18,13 +18,18 @@ import ( "github.com/stolostron/multicluster-global-hub/agent/pkg/config" specController "github.com/stolostron/multicluster-global-hub/agent/pkg/spec/controller" statusController "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller" + "github.com/stolostron/multicluster-global-hub/pkg/transport" ) +var crdCtrlStarted = false + type crdController struct { mgr ctrl.Manager log logr.Logger restConfig *rest.Config agentConfig *config.AgentConfig + producer transport.Producer + consumer transport.Consumer } func (c *crdController) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { @@ -32,12 +37,12 @@ func (c *crdController) Reconcile(ctx context.Context, request ctrl.Request) (ct reqLogger.V(2).Info("crd controller", "NamespacedName:", request.NamespacedName) // add spec controllers - if err := specController.AddToManager(c.mgr, c.agentConfig); err != nil { + if err := specController.AddToManager(c.mgr, c.consumer, c.agentConfig); err != nil { return ctrl.Result{}, fmt.Errorf("failed to add spec syncer: %w", err) } reqLogger.V(2).Info("add spec controllers to manager") - if err := statusController.AddControllers(ctx, c.mgr, c.agentConfig); err != nil { + if err := statusController.AddControllers(ctx, c.mgr, c.producer, c.agentConfig); err != nil { return ctrl.Result{}, fmt.Errorf("failed to add status syncer: %w", err) } @@ -56,8 +61,13 @@ func (c *crdController) Reconcile(ctx context.Context, request ctrl.Request) (ct // this controller is used to watch the multiclusterhub crd or clustermanager crd // if the crd exists, then add controllers to the manager dynamically -func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *config.AgentConfig) error { - return ctrl.NewControllerManagedBy(mgr). +func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *config.AgentConfig, + producer transport.Producer, consumer transport.Consumer, +) error { + if crdCtrlStarted { + return nil + } + if err := ctrl.NewControllerManagedBy(mgr). For(&apiextensionsv1.CustomResourceDefinition{}, builder.WithPredicates(predicate.Funcs{ // trigger the reconciler only if the crd is created CreateFunc: func(e event.CreateEvent) bool { @@ -77,6 +87,12 @@ func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *co mgr: mgr, restConfig: restConfig, agentConfig: agentConfig, + producer: producer, + consumer: consumer, log: ctrl.Log.WithName("crd-controller"), - }) + }); err != nil { + return err + } + crdCtrlStarted = true + return nil } diff --git a/agent/pkg/controllers/hub_clusterclaim_controller.go b/agent/pkg/controllers/hub_clusterclaim_controller.go index 7d6e74873..a0431428f 100644 --- a/agent/pkg/controllers/hub_clusterclaim_controller.go +++ b/agent/pkg/controllers/hub_clusterclaim_controller.go @@ -24,6 +24,8 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/utils" ) +var clusterClaimCtrlStared = false + type hubClusterClaimController struct { client client.Client log logr.Logger @@ -40,6 +42,9 @@ func (c *hubClusterClaimController) Reconcile(ctx context.Context, request ctrl. func AddHubClusterClaimController(mgr ctrl.Manager) error { // the controller is only to trigger create hub clusterClaim at the beginning // do nothing if the hub clusterClaim existed + if clusterClaimCtrlStared { + return nil + } clusterClaimPredicate := predicate.NewPredicateFuncs(func(object client.Object) bool { if object.GetName() == constants.HubClusterClaimName { return false @@ -47,12 +52,17 @@ func AddHubClusterClaimController(mgr ctrl.Manager) error { clusterClaim, _ := getClusterClaim(context.Background(), mgr.GetClient(), constants.HubClusterClaimName) return clusterClaim == nil }) - return ctrl.NewControllerManagedBy(mgr).Named("hubclusterclaim-controller"). + err := ctrl.NewControllerManagedBy(mgr).Named("hubclusterclaim-controller"). For(&clustersv1alpha1.ClusterClaim{}, builder.WithPredicates(clusterClaimPredicate)). Complete(&hubClusterClaimController{ client: mgr.GetClient(), log: ctrl.Log.WithName("hubclusterclaim-controller"), }) + if err != nil { + return err + } + clusterClaimCtrlStared = true + return nil } func updateHubClusterClaim(ctx context.Context, k8sClient client.Client, diff --git a/agent/pkg/controllers/version_clusterclaim_controller.go b/agent/pkg/controllers/version_clusterclaim_controller.go index 93704cd8a..3b3b7d586 100644 --- a/agent/pkg/controllers/version_clusterclaim_controller.go +++ b/agent/pkg/controllers/version_clusterclaim_controller.go @@ -20,6 +20,8 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/constants" ) +var clusterVersionCtrlStarted = false + type versionClusterClaimController struct { client client.Client log logr.Logger @@ -46,16 +48,24 @@ func (c *versionClusterClaimController) Reconcile(ctx context.Context, request c } func AddVersionClusterClaimController(mgr ctrl.Manager) error { + if clusterVersionCtrlStarted { + return nil + } clusterClaimPredicate, _ := predicate.LabelSelectorPredicate(metav1.LabelSelector{ MatchLabels: map[string]string{ constants.GlobalHubOwnerLabelKey: constants.GHAgentOwnerLabelValue, }, }) - return ctrl.NewControllerManagedBy(mgr).Named("clusterclaim-controller"). + err := ctrl.NewControllerManagedBy(mgr).Named("clusterclaim-controller"). For(&clustersv1alpha1.ClusterClaim{}, builder.WithPredicates(clusterClaimPredicate)). Watches(&mchv1.MultiClusterHub{}, &handler.EnqueueRequestForObject{}). Complete(&versionClusterClaimController{ client: mgr.GetClient(), log: ctrl.Log.WithName("clusterclaim-controller"), }) + if err != nil { + return err + } + clusterVersionCtrlStarted = true + return nil } diff --git a/agent/pkg/spec/controller/controller.go b/agent/pkg/spec/controller/controller.go index e743737a5..7bd41e995 100644 --- a/agent/pkg/spec/controller/controller.go +++ b/agent/pkg/spec/controller/controller.go @@ -9,19 +9,14 @@ import ( "github.com/stolostron/multicluster-global-hub/agent/pkg/spec/controller/syncers" "github.com/stolostron/multicluster-global-hub/agent/pkg/spec/controller/workers" "github.com/stolostron/multicluster-global-hub/pkg/constants" - genericconsumer "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer" + "github.com/stolostron/multicluster-global-hub/pkg/transport" ) -func AddToManager(mgr ctrl.Manager, agentConfig *config.AgentConfig) error { - // add consumer to manager - consumer, err := genericconsumer.NewGenericConsumer(agentConfig.TransportConfig, - []string{agentConfig.TransportConfig.KafkaConfig.Topics.SpecTopic}, - ) - if err != nil { - return fmt.Errorf("failed to initialize transport consumer: %w", err) - } - if err := mgr.Add(consumer); err != nil { - return fmt.Errorf("failed to add transport consumer to manager: %w", err) +var specCtrlStarted = false + +func AddToManager(mgr ctrl.Manager, consumer transport.Consumer, agentConfig *config.AgentConfig) error { + if specCtrlStarted { + return nil } // add worker pool to manager @@ -45,5 +40,7 @@ func AddToManager(mgr ctrl.Manager, agentConfig *config.AgentConfig) error { } dispatcher.RegisterSyncer(constants.ResyncMsgKey, syncers.NewResyncSyncer()) + + specCtrlStarted = true return nil } diff --git a/agent/pkg/status/controller/controller.go b/agent/pkg/status/controller/controller.go index 70c3e8379..61ee237fa 100644 --- a/agent/pkg/status/controller/controller.go +++ b/agent/pkg/status/controller/controller.go @@ -18,20 +18,21 @@ import ( "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/managedclusters" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/placement" "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller/policies" - transportproducer "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" + "github.com/stolostron/multicluster-global-hub/pkg/transport" ) +var statusCtrlStarted = false + // AddControllers adds all the controllers to the Manager. -func AddControllers(ctx context.Context, mgr ctrl.Manager, agentConfig *config.AgentConfig) error { - if err := agentstatusconfig.AddConfigController(mgr, agentConfig); err != nil { - return fmt.Errorf("failed to add ConfigMap controller: %w", err) +func AddControllers(ctx context.Context, mgr ctrl.Manager, producer transport.Producer, + agentConfig *config.AgentConfig, +) error { + if statusCtrlStarted { + return nil } - // only use the cloudevents - producer, err := transportproducer.NewGenericProducer(agentConfig.TransportConfig, - agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic) - if err != nil { - return fmt.Errorf("failed to init status transport producer: %w", err) + if err := agentstatusconfig.AddConfigController(mgr, agentConfig); err != nil { + return fmt.Errorf("failed to add ConfigMap controller: %w", err) } // managed cluster @@ -40,7 +41,7 @@ func AddControllers(ctx context.Context, mgr ctrl.Manager, agentConfig *config.A } // event syncer - err = event.LaunchEventSyncer(ctx, mgr, agentConfig, producer) + err := event.LaunchEventSyncer(ctx, mgr, agentConfig, producer) if err != nil { return fmt.Errorf("failed to launch event syncer: %w", err) } @@ -81,8 +82,10 @@ func AddControllers(ctx context.Context, mgr ctrl.Manager, agentConfig *config.A // lunch a time filter, it must be called after filter.RegisterTimeFilter(key) if err := filter.LaunchTimeFilter(ctx, mgr.GetClient(), agentConfig.PodNameSpace, - agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic); err != nil { + agentConfig.TransportConfig.KafkaCredential.StatusTopic); err != nil { return fmt.Errorf("failed to launch time filter: %w", err) } + + statusCtrlStarted = true return nil } diff --git a/agent/pkg/status/controller/event/event_syncer.go b/agent/pkg/status/controller/event/event_syncer.go index 16739dbf3..fbb6a95df 100644 --- a/agent/pkg/status/controller/event/event_syncer.go +++ b/agent/pkg/status/controller/event/event_syncer.go @@ -37,7 +37,7 @@ var eventPredicateFunc = predicate.NewPredicateFuncs(func(obj client.Object) boo func LaunchEventSyncer(ctx context.Context, mgr ctrl.Manager, agentConfig *config.AgentConfig, producer transport.Producer, ) error { - eventTopic := agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic + eventTopic := agentConfig.TransportConfig.KafkaCredential.StatusTopic instance := func() client.Object { return &corev1.Event{} diff --git a/agent/pkg/status/controller/policies/policy_syncer.go b/agent/pkg/status/controller/policies/policy_syncer.go index d19387630..c0b8b31bb 100644 --- a/agent/pkg/status/controller/policies/policy_syncer.go +++ b/agent/pkg/status/controller/policies/policy_syncer.go @@ -56,7 +56,7 @@ func LaunchPolicySyncer(ctx context.Context, mgr ctrl.Manager, agentConfig *conf utils.HasLabel(obj, constants.PolicyEventRootPolicyNameLabelKey) // replicated policy }, mgr.GetClient(), - agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic, + agentConfig.TransportConfig.KafkaCredential.StatusTopic, ) // 4. local policy spec diff --git a/manager/cmd/manager/main.go b/manager/cmd/manager/main.go index 78014eaac..814ab83d9 100644 --- a/manager/cmd/manager/main.go +++ b/manager/cmd/manager/main.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" "github.com/stolostron/multicluster-global-hub/manager/pkg/backup" + "github.com/stolostron/multicluster-global-hub/manager/pkg/config" managerconfig "github.com/stolostron/multicluster-global-hub/manager/pkg/config" "github.com/stolostron/multicluster-global-hub/manager/pkg/cronjob" "github.com/stolostron/multicluster-global-hub/manager/pkg/hubmanagement" @@ -39,7 +40,7 @@ import ( commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects" "github.com/stolostron/multicluster-global-hub/pkg/statistics" "github.com/stolostron/multicluster-global-hub/pkg/transport" - "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" + "github.com/stolostron/multicluster-global-hub/pkg/transport/controller" "github.com/stolostron/multicluster-global-hub/pkg/utils" ) @@ -55,11 +56,10 @@ const ( ) var ( - setupLog = ctrl.Log.WithName("setup") - managerNamespace = constants.GHDefaultNamespace - enableSimulation = false - errFlagParameterEmpty = errors.New("flag parameter empty") - errFlagParameterIllegalValue = errors.New("flag parameter illegal value") + setupLog = ctrl.Log.WithName("setup") + managerNamespace = constants.GHDefaultNamespace + enableSimulation = false + errFlagParameterEmpty = errors.New("flag parameter empty") ) func init() { @@ -71,12 +71,9 @@ func parseFlags() *managerconfig.ManagerConfig { SyncerConfig: &managerconfig.SyncerConfig{}, DatabaseConfig: &managerconfig.DatabaseConfig{}, TransportConfig: &transport.TransportConfig{ - KafkaConfig: &transport.KafkaConfig{ - EnableTLS: true, - Topics: &transport.ClusterTopic{}, - ProducerConfig: &transport.KafkaProducerConfig{}, - ConsumerConfig: &transport.KafkaConsumerConfig{}, - }, + IsManager: true, + ConsumerGroupId: "global-hub-manager", + EnableDatabaseOffset: true, }, StatisticsConfig: &statistics.StatisticsConfig{}, NonK8sAPIServerConfig: &nonk8sapi.NonK8sAPIServerConfig{}, @@ -111,32 +108,10 @@ func parseFlags() *managerconfig.ManagerConfig { "transport-bridge-database-url", "", "The URL of database server for the transport-bridge user.") pflag.StringVar(&managerConfig.TransportConfig.TransportType, "transport-type", "kafka", "The transport type, 'kafka'.") - pflag.StringVar(&managerConfig.TransportConfig.MessageCompressionType, "transport-message-compression-type", - "gzip", "The message compression type for transport layer, 'gzip' or 'no-op'.") pflag.DurationVar(&managerConfig.TransportConfig.CommitterInterval, "transport-committer-interval", 40*time.Second, "The committer interval for transport layer.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server", - "kafka-kafka-bootstrap.kafka.svc:9092", "The bootstrap server for kafka.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClusterIdentity, "kafka-cluster-identity", - "", "The identity for kafka cluster.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "", - "The path of CA certificate for kafka bootstrap server.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "", - "The path of client certificate for kafka bootstrap server.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "", - "The path of client key for kafka bootstrap server.") pflag.StringVar(&managerConfig.DatabaseConfig.CACertPath, "postgres-ca-path", "/postgres-ca/ca.crt", "The path of CA certificate for kafka bootstrap server.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id", - "multicluster-global-hub-manager", "ID for the kafka producer.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-producer-topic", - "spec", "Topic for the kafka producer.") - pflag.IntVar(&managerConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB, - "kafka-message-size-limit", 940, "The limit for kafka message size in KB.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID, - "kafka-consumer-id", "multicluster-global-hub-manager", "ID for the kafka consumer.") - pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.Topics.StatusTopic, - "kafka-consumer-topic", "event", "Topic for the kafka consumer.") pflag.StringVar(&managerConfig.StatisticsConfig.LogInterval, "statistics-log-interval", "1m", "The log interval for statistics.") pflag.StringVar(&managerConfig.NonK8sAPIServerConfig.ClusterAPIURL, "cluster-api-url", @@ -173,10 +148,6 @@ func completeConfig(managerConfig *managerconfig.ManagerConfig) error { if managerConfig.DatabaseConfig.ProcessDatabaseURL == "" { return fmt.Errorf("database url for process user: %w", errFlagParameterEmpty) } - if managerConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB > producer.MaxMessageKBLimit { - return fmt.Errorf("%w - size must not exceed %d : %s", errFlagParameterIllegalValue, - managerConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB, "kafka-message-size-limit") - } // the specified jobs(concatenate multiple jobs with ',') runs when the container starts val, ok := os.LookupEnv(launchJobNamesEnv) if ok && val != "" { @@ -242,51 +213,54 @@ func createManager(ctx context.Context, return nil, fmt.Errorf("failed to create a new manager: %w", err) } - producer, err := producer.NewGenericProducer(managerConfig.TransportConfig, - managerConfig.TransportConfig.KafkaConfig.Topics.SpecTopic) + // TODO: refactor the manager to start the conflation manager so that it can handle the events from restful API + err = controller.NewTransportCtrl(managerConfig.ManagerNamespace, constants.GHTransportConfigSecret, + transportCallback(mgr, managerConfig), + managerConfig.TransportConfig, + ).SetupWithManager(mgr) if err != nil { - return nil, fmt.Errorf("failed to init spec transport bridge: %w", err) + return nil, fmt.Errorf("failed to add the transport controller") } - // TODO: refactor the manager to start the conflation manager so that it can handle the events from restful API - - if !managerConfig.WithACM { - return mgr, nil + // the cronjob can start without producer and consumer + if err := cronjob.AddSchedulerToManager(ctx, mgr, managerConfig, enableSimulation); err != nil { + return nil, fmt.Errorf("failed to add scheduler to manager: %w", err) + } + // need lock DB for backup + backupPVC := backup.NewBackupPVCReconciler(mgr, sqlConn) + if err := backupPVC.SetupWithManager(mgr); err != nil { + return nil, err } - if managerConfig.EnableGlobalResource { if err := nonk8sapi.AddNonK8sApiServer(mgr, managerConfig.NonK8sAPIServerConfig); err != nil { return nil, fmt.Errorf("failed to add non-k8s-api-server: %w", err) } } + return mgr, nil +} - if managerConfig.EnableGlobalResource { - if err := specsyncer.AddGlobalResourceSpecSyncers(mgr, managerConfig, producer); err != nil { - return nil, fmt.Errorf("failed to add global resource spec syncers: %w", err) +func transportCallback(mgr ctrl.Manager, managerConfig *config.ManagerConfig) controller.TransportCallback { + return func(producer transport.Producer, consumer transport.Consumer) error { + if !managerConfig.WithACM { + return nil + } + if managerConfig.EnableGlobalResource { + if err := specsyncer.AddGlobalResourceSpecSyncers(mgr, managerConfig, producer); err != nil { + return fmt.Errorf("failed to add global resource spec syncers: %w", err) + } } - } - - if err := statussyncer.AddStatusSyncers(mgr, managerConfig); err != nil { - return nil, fmt.Errorf("failed to add transport-to-db syncers: %w", err) - } - - // add hub management - if err := hubmanagement.AddHubManagement(mgr, producer); err != nil { - return nil, fmt.Errorf("failed to add hubmanagement to manager - %w", err) - } - // need lock DB for backup - backupPVC := backup.NewBackupPVCReconciler(mgr, sqlConn) - err = backupPVC.SetupWithManager(mgr) - if err != nil { - return nil, err - } + if err := statussyncer.AddStatusSyncers(mgr, consumer, managerConfig); err != nil { + return fmt.Errorf("failed to add transport-to-db syncers: %w", err) + } - if err := cronjob.AddSchedulerToManager(ctx, mgr, managerConfig, enableSimulation); err != nil { - return nil, fmt.Errorf("failed to add scheduler to manager: %w", err) + // add hub management + if err := hubmanagement.AddHubManagement(mgr, producer); err != nil { + return fmt.Errorf("failed to add hubmanagement to manager - %w", err) + } + setupLog.Info("add the manager controllers to ctrl.Manager") + return nil } - - return mgr, nil } // function to handle defers with exit, see https://stackoverflow.com/a/27629493/553720. diff --git a/manager/pkg/hubmanagement/hub_management.go b/manager/pkg/hubmanagement/hub_management.go index 330ea441b..ee36ae3ba 100644 --- a/manager/pkg/hubmanagement/hub_management.go +++ b/manager/pkg/hubmanagement/hub_management.go @@ -31,6 +31,8 @@ const ( ProbeDuration = 2 * time.Minute // the duration to detect run the updating ) +var hubMgrStarted = false + // manage the leaf hub lifecycle based on the heartbeat type HubManagement struct { log logr.Logger @@ -49,7 +51,14 @@ func NewHubManagement(producer transport.Producer, probeDuration, activeTimeout } func AddHubManagement(mgr ctrl.Manager, producer transport.Producer) error { - return mgr.Add(NewHubManagement(producer, ProbeDuration, ActiveTimeout)) + if hubMgrStarted { + return nil + } + if err := mgr.Add(NewHubManagement(producer, ProbeDuration, ActiveTimeout)); err != nil { + return err + } + hubMgrStarted = true + return nil } func (h *HubManagement) Start(ctx context.Context) error { diff --git a/manager/pkg/specsyncer/syncers.go b/manager/pkg/specsyncer/syncers.go index 2b4497d93..c14029efd 100644 --- a/manager/pkg/specsyncer/syncers.go +++ b/manager/pkg/specsyncer/syncers.go @@ -11,10 +11,16 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/transport" ) +var specCtrlStarted = false + func AddGlobalResourceSpecSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig, producer transport.Producer, ) error { + if specCtrlStarted { + return nil + } + if err := spec2db.AddSpec2DBControllers(mgr); err != nil { return fmt.Errorf("failed to add spec-to-db controllers: %w", err) } @@ -28,5 +34,6 @@ func AddGlobalResourceSpecSyncers(mgr ctrl.Manager, return fmt.Errorf("failed to add status db watchers: %w", err) } + specCtrlStarted = true return nil } diff --git a/manager/pkg/statussyncer/dispatcher/transport_dispatcher.go b/manager/pkg/statussyncer/dispatcher/transport_dispatcher.go index 19d37bdb0..deb7d11c6 100644 --- a/manager/pkg/statussyncer/dispatcher/transport_dispatcher.go +++ b/manager/pkg/statussyncer/dispatcher/transport_dispatcher.go @@ -11,7 +11,6 @@ import ( "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/conflator" "github.com/stolostron/multicluster-global-hub/pkg/statistics" "github.com/stolostron/multicluster-global-hub/pkg/transport" - genericconsumer "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer" ) // Get message from transport, convert it to bundle and forward it to conflation manager. @@ -22,21 +21,9 @@ type TransportDispatcher struct { statistic *statistics.Statistics } -func AddTransportDispatcher(mgr ctrl.Manager, managerConfig *config.ManagerConfig, +func AddTransportDispatcher(mgr ctrl.Manager, consumer transport.Consumer, managerConfig *config.ManagerConfig, conflationManager *conflator.ConflationManager, stats *statistics.Statistics, ) error { - // start a consumer - topics := managerConfig.TransportConfig.KafkaConfig.Topics - consumer, err := genericconsumer.NewGenericConsumer(managerConfig.TransportConfig, - []string{topics.StatusTopic}, - genericconsumer.EnableDatabaseOffset(true)) - if err != nil { - return fmt.Errorf("failed to initialize transport consumer: %w", err) - } - if err := mgr.Add(consumer); err != nil { - return fmt.Errorf("failed to add transport consumer to manager: %w", err) - } - transportDispatcher := &TransportDispatcher{ log: ctrl.Log.WithName("conflation-dispatcher"), consumer: consumer, diff --git a/manager/pkg/statussyncer/syncers.go b/manager/pkg/statussyncer/syncers.go index fc0d61b56..2f00fcdbf 100644 --- a/manager/pkg/statussyncer/syncers.go +++ b/manager/pkg/statussyncer/syncers.go @@ -10,11 +10,17 @@ import ( "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/dispatcher" dbsyncer "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/syncers" "github.com/stolostron/multicluster-global-hub/pkg/statistics" + "github.com/stolostron/multicluster-global-hub/pkg/transport" ) +var statusCtrlStarted = false + // AddStatusSyncers performs the initial setup required before starting the runtime manager. // adds controllers and/or runnables to the manager, registers handler to conflation manager -func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) error { +func AddStatusSyncers(mgr ctrl.Manager, consumer transport.Consumer, managerConfig *config.ManagerConfig) error { + if statusCtrlStarted { + return nil + } // create statistics stats := statistics.NewStatistics(managerConfig.StatisticsConfig) if err := mgr.Add(stats); err != nil { @@ -26,7 +32,7 @@ func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) err registerHandler(conflationManager, managerConfig.EnableGlobalResource) // start consume message from transport to conflation manager - if err := dispatcher.AddTransportDispatcher(mgr, managerConfig, conflationManager, stats); err != nil { + if err := dispatcher.AddTransportDispatcher(mgr, consumer, managerConfig, conflationManager, stats); err != nil { return err } @@ -40,6 +46,7 @@ func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) err if err := mgr.Add(committer); err != nil { return fmt.Errorf("failed to start the offset committer: %w", err) } + statusCtrlStarted = true return nil } diff --git a/operator/pkg/controllers/addon/addon_controller_manifests.go b/operator/pkg/controllers/addon/addon_controller_manifests.go index 4b3c659a5..4e01ba0c6 100644 --- a/operator/pkg/controllers/addon/addon_controller_manifests.go +++ b/operator/pkg/controllers/addon/addon_controller_manifests.go @@ -19,7 +19,6 @@ import ( addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" clusterv1 "open-cluster-management.io/api/cluster/v1" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/kustomize/kyaml/yaml" globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/api/operator/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" @@ -193,7 +192,12 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, return nil, fmt.Errorf("failed to update the kafkauser for the cluster(%s): %v", cluster.Name, err) } - kafkaConfigYaml, err := yaml.Marshal(kafkaConnection) + // if the credential is byo, don't need add secret certs + attachCertSecrets := true + if config.IsBYOKafka() { + attachCertSecrets = false + } + kafkaConfigYaml, err := kafkaConnection.YamlMarshal(attachCertSecrets) if err != nil { return nil, fmt.Errorf("failed to marshalling the kafka config yaml: %w", err) } diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-deployment.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-deployment.yaml index 0430d687c..48b77d7c6 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-deployment.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-deployment.yaml @@ -38,16 +38,8 @@ spec: - --zap-log-level={{.LogLevel}} - --pod-namespace=$(POD_NAMESPACE) - --leaf-hub-name={{ .LeafHubID }} - - --kafka-consumer-id={{ .LeafHubID }} - --enforce-hoh-rbac=false - --transport-type={{ .TransportType }} - - --kafka-bootstrap-server={{ .KafkaBootstrapServer }} - - --kafka-ca-cert-path=/kafka-cluster-ca/ca.crt - - --kafka-client-cert-path=/kafka-client-certs/tls.crt - - --kafka-client-key-path=/kafka-client-certs/tls.key - - --kafka-consumer-topic={{.KafkaConsumerTopic}} - - --kafka-producer-topic={{.KafkaProducerTopic}} - - --transport-message-compression-type={{.MessageCompressionType}} - --lease-duration={{.LeaseDuration}} - --renew-deadline={{.RenewDeadline}} - --retry-period={{.RetryPeriod}} @@ -66,13 +58,6 @@ spec: fieldRef: apiVersion: v1 fieldPath: metadata.namespace - volumeMounts: - - mountPath: /kafka-cluster-ca - name: kafka-cluster-ca - readOnly: true - - mountPath: /kafka-client-certs - name: kafka-client-certs - readOnly: true {{- if .ImagePullSecretName }} imagePullSecrets: - name: {{ .ImagePullSecretName }} @@ -91,11 +76,4 @@ spec: tolerationSeconds: {{.TolerationSeconds}} {{- end}} {{- end}} - volumes: - - name: kafka-cluster-ca - secret: - secretName: {{.KafkaClusterCASecret}} - - name: kafka-client-certs - secret: - secretName: {{.KafkaClientCertSecret}} {{ end }} diff --git a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go index 6babd9e92..d53d63b13 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go @@ -14,7 +14,6 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/kustomize/kyaml/yaml" "github.com/stolostron/multicluster-global-hub/operator/api/operator/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" @@ -91,7 +90,7 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, } transportConn := config.GetTransporterConn() - if transportConn == nil { + if transportConn == nil || transportConn.BootstrapServer == "" { return fmt.Errorf("the transport connection(%s) must not be empty", transportConn) } @@ -111,7 +110,7 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, return fmt.Errorf("failed to get the electionConfig %w", err) } - kafkaConfigYaml, err := yaml.Marshal(transportConn) + kafkaConfigYaml, err := transportConn.YamlMarshal(false) if err != nil { return fmt.Errorf("failed to marshall kafka connetion for config: %w", err) } diff --git a/operator/pkg/controllers/hubofhubs/manager/manifests/deployment.yaml b/operator/pkg/controllers/hubofhubs/manager/manifests/deployment.yaml index 8c3db3365..f87cf2276 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manifests/deployment.yaml +++ b/operator/pkg/controllers/hubofhubs/manager/manifests/deployment.yaml @@ -38,15 +38,7 @@ spec: - --manager-namespace=$(POD_NAMESPACE) - --watch-namespace=$(WATCH_NAMESPACE) - --transport-type={{.TransportType}} - - --kafka-bootstrap-server={{.KafkaBootstrapServer}} - - --kafka-cluster-identity={{.KafkaClusterIdentity}} - - --kafka-consumer-topic={{.KafkaConsumerTopic}} - - --kafka-producer-topic={{.KafkaProducerTopic}} - - --kafka-ca-cert-path=/kafka-certs/ca.crt - - --kafka-client-cert-path=/kafka-certs/client.crt - - --kafka-client-key-path=/kafka-certs/client.key - --postgres-ca-path=/postgres-credential/ca.crt - - --transport-message-compression-type={{.MessageCompressionType}} - --process-database-url=$(DATABASE_URL) - --transport-bridge-database-url=$(DATABASE_URL) - --lease-duration={{.LeaseDuration}} @@ -95,9 +87,6 @@ spec: name: webhook-certs readOnly: true {{- end }} - - mountPath: /kafka-certs - name: kafka-certs - readOnly: true - mountPath: /postgres-credential name: postgres-credential readOnly: true @@ -166,9 +155,6 @@ spec: {{- end}} {{- end}} volumes: - - name: kafka-certs - secret: - secretName: kafka-certs-secret - name: postgres-credential secret: secretName: postgres-credential-secret diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go index 0f120f5ab..155ec36bd 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go @@ -90,7 +90,9 @@ var mghPred = predicate.Funcs{ }, } -func StartKafkaController(ctx context.Context, mgr ctrl.Manager, transporter transport.Transporter) (*KafkaController, error) { +func StartKafkaController(ctx context.Context, mgr ctrl.Manager, transporter transport.Transporter) ( + *KafkaController, error, +) { r := &KafkaController{ Manager: mgr, trans: transporter.(*strimziTransporter), diff --git a/pkg/transport/assembler_test.go b/pkg/transport/assembler_test.go index 0f8d2ced1..06dd1ae78 100644 --- a/pkg/transport/assembler_test.go +++ b/pkg/transport/assembler_test.go @@ -15,16 +15,21 @@ import ( ) func TestAssembler(t *testing.T) { - topic := "test" transportConfig := &transport.TransportConfig{ TransportType: string(transport.Chan), + KafkaCredential: &transport.KafkaConnCredential{ + SpecTopic: "spec", + StatusTopic: "status", + }, } - genericProducer, err := producer.NewGenericProducer(transportConfig, topic) + transportConfig.IsManager = true + genericProducer, err := producer.NewGenericProducer(transportConfig) assert.Nil(t, err) genericProducer.SetDataLimit(5) - genericConsumer, err := consumer.NewGenericConsumer(transportConfig, []string{topic}) + transportConfig.IsManager = false + genericConsumer, err := consumer.NewGenericConsumer(transportConfig) assert.Nil(t, err) go func() { err = genericConsumer.Start(context.TODO()) diff --git a/pkg/transport/config/confluent_config.go b/pkg/transport/config/confluent_config.go index b2909e7dd..c4060d1d9 100644 --- a/pkg/transport/config/confluent_config.go +++ b/pkg/transport/config/confluent_config.go @@ -47,7 +47,7 @@ func SetConsumerConfig(kafkaConfigMap *kafkav2.ConfigMap, groupId string) { func SetTLSByLocation(kafkaConfigMap *kafkav2.ConfigMap, caCertPath, certPath, keyPath string) error { _, validCA := utils.Validate(caCertPath) if !validCA { - return errors.New("invalid ca certificate") + return errors.New("invalid ca certificate for tls") } _, validCert := utils.Validate(certPath) _, validKey := utils.Validate(keyPath) @@ -106,6 +106,16 @@ func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig, producer bool) (* // GetConfluentConfigMapByConfig tries to connect the kafka with transport secret(ca.key, client.crt, client.key) func GetConfluentConfigMapByConfig(transportConfig *corev1.Secret, c client.Client, consumerGroupID string) ( *kafkav2.ConfigMap, error, +) { + conn, err := GetTransportCredentailBySecret(transportConfig, c) + if err != nil { + return nil, err + } + return GetConfluentConfigMapByKafkaCredential(conn, consumerGroupID) +} + +func GetConfluentConfigMapByKafkaCredential(conn *transport.KafkaConnCredential, consumerGroupID string) ( + *kafkav2.ConfigMap, error, ) { kafkaConfigMap := GetBasicConfigMap() if consumerGroupID != "" { @@ -113,10 +123,6 @@ func GetConfluentConfigMapByConfig(transportConfig *corev1.Secret, c client.Clie } else { SetProducerConfig(kafkaConfigMap) } - conn, err := GetTransportCredentailBySecret(transportConfig, c) - if err != nil { - return nil, err - } _ = kafkaConfigMap.SetKey("bootstrap.servers", conn.BootstrapServer) // if the certs is invalid if conn.CACert == "" || conn.ClientCert == "" || conn.ClientKey == "" { @@ -136,7 +142,6 @@ func GetConfluentConfigMapByConfig(transportConfig *corev1.Secret, c client.Clie if err := kafkaConfigMap.SetKey("ssl.key.pem", conn.ClientKey); err != nil { return nil, err } - return kafkaConfigMap, nil } diff --git a/pkg/transport/consumer/generic_consumer.go b/pkg/transport/consumer/generic_consumer.go index dad9de0bb..d51db4ce5 100644 --- a/pkg/transport/consumer/generic_consumer.go +++ b/pkg/transport/consumer/generic_consumer.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "sync" kafka_confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -27,12 +28,16 @@ var transportID string type GenericConsumer struct { log logr.Logger - client cloudevents.Client assembler *messageAssembler eventChan chan *cloudevents.Event - consumeTopics []string - clusterIdentity string enableDatabaseOffset bool + clusterID string + + consumerCtx context.Context + consumerCancel context.CancelFunc + client cloudevents.Client + + mutex sync.Mutex } type GenericConsumeOption func(*GenericConsumer) error @@ -44,58 +49,62 @@ func EnableDatabaseOffset(enableOffset bool) GenericConsumeOption { } } -func NewGenericConsumer(tranConfig *transport.TransportConfig, topics []string, +func NewGenericConsumer(tranConfig *transport.TransportConfig, opts ...GenericConsumeOption, ) (*GenericConsumer, error) { - log := ctrl.Log.WithName(fmt.Sprintf("%s-consumer", tranConfig.TransportType)) - var receiver interface{} + c := &GenericConsumer{ + log: ctrl.Log.WithName(fmt.Sprintf("%s-consumer", tranConfig.TransportType)), + eventChan: make(chan *cloudevents.Event), + assembler: newMessageAssembler(), + enableDatabaseOffset: tranConfig.EnableDatabaseOffset, + } + if err := c.initClient(tranConfig); err != nil { + return nil, err + } + if err := c.applyOptions(opts...); err != nil { + return nil, err + } + return c, nil +} + +// initClient will init the consumer identity, clientProtocol, client +func (c *GenericConsumer) initClient(tranConfig *transport.TransportConfig) error { var err error - var clusterIdentity string + var clientProtocol interface{} + + c.clusterID = tranConfig.KafkaCredential.ClusterID + topics := []string{tranConfig.KafkaCredential.StatusTopic} + if !tranConfig.IsManager { + topics[0] = tranConfig.KafkaCredential.SpecTopic + } + switch tranConfig.TransportType { case string(transport.Kafka): - log.Info("transport consumer with cloudevents-kafka receiver") - receiver, err = getConfluentReceiverProtocol(tranConfig, topics) + c.log.Info("transport consumer with cloudevents-kafka receiver") + clientProtocol, err = getConfluentReceiverProtocol(tranConfig, topics) if err != nil { - return nil, err + return err } - clusterIdentity = tranConfig.KafkaConfig.ClusterIdentity case string(transport.Chan): - log.Info("transport consumer with go chan receiver") + c.log.Info("transport consumer with go chan receiver") if tranConfig.Extends == nil { tranConfig.Extends = make(map[string]interface{}) } - topic := "event" - if topics != nil && len(topics) > 0 { - topic = topics[0] - } + topic := topics[0] if _, found := tranConfig.Extends[topic]; !found { tranConfig.Extends[topic] = gochan.New() } - receiver = tranConfig.Extends[topic] - clusterIdentity = "kafka-cluster-chan" + clientProtocol = tranConfig.Extends[topic] default: - return nil, fmt.Errorf("transport-type - %s is not a valid option", tranConfig.TransportType) + return fmt.Errorf("transport-type - %s is not a valid option", tranConfig.TransportType) } - client, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1)) + c.client, err = cloudevents.NewClient(clientProtocol, client.WithPollGoroutines(1)) if err != nil { - return nil, err + return err } - c := &GenericConsumer{ - log: log, - client: client, - clusterIdentity: clusterIdentity, - eventChan: make(chan *cloudevents.Event), - assembler: newMessageAssembler(), - enableDatabaseOffset: false, - consumeTopics: topics, - } - if err := c.applyOptions(opts...); err != nil { - return nil, err - } - transportID = clusterIdentity - return c, nil + return nil } func (c *GenericConsumer) applyOptions(opts ...GenericConsumeOption) error { @@ -107,10 +116,32 @@ func (c *GenericConsumer) applyOptions(opts ...GenericConsumeOption) error { return nil } +func (c *GenericConsumer) Reconnect(ctx context.Context, tranConfig *transport.TransportConfig) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + err := c.initClient(tranConfig) + if err != nil { + return err + } + + // close the previous consumer + if c.consumerCancel != nil { + c.consumerCancel() + } + c.consumerCtx, c.consumerCancel = context.WithCancel(ctx) + go func() { + if err := c.Start(c.consumerCtx); err != nil { + c.log.Error(err, "failed to reconnect(start) the consumer") + } + }() + return nil +} + func (c *GenericConsumer) Start(ctx context.Context) error { receiveContext := ctx if c.enableDatabaseOffset { - offsets, err := getInitOffset(c.clusterIdentity) + offsets, err := getInitOffset(c.clusterID) if err != nil { return err } @@ -188,7 +219,8 @@ func getInitOffset(kafkaClusterIdentity string) ([]kafka.TopicPartition, error) // } func getConfluentReceiverProtocol(transportConfig *transport.TransportConfig, topics []string) (interface{}, error) { - configMap, err := config.GetConfluentConfigMap(transportConfig.KafkaConfig, false) + configMap, err := config.GetConfluentConfigMapByKafkaCredential(transportConfig.KafkaCredential, + transportConfig.ConsumerGroupId) if err != nil { return nil, err } diff --git a/pkg/transport/consumer/generic_consumer_test.go b/pkg/transport/consumer/generic_consumer_test.go index 8e5805da9..3a385294d 100644 --- a/pkg/transport/consumer/generic_consumer_test.go +++ b/pkg/transport/consumer/generic_consumer_test.go @@ -22,16 +22,15 @@ func TestGenerateConsumer(t *testing.T) { t.Errorf("failed to init mock kafka cluster - %v", err) } transportConfig := &transport.TransportConfig{ - TransportType: "kafka", - KafkaConfig: &transport.KafkaConfig{ + TransportType: "kafka", + ConsumerGroupId: "test-consumer", + IsManager: false, + KafkaCredential: &transport.KafkaConnCredential{ BootstrapServer: mockKafkaCluster.BootstrapServers(), - EnableTLS: false, - ConsumerConfig: &transport.KafkaConsumerConfig{ - ConsumerID: "test-consumer", - }, + SpecTopic: "test-topic", }, } - _, err = NewGenericConsumer(transportConfig, []string{"test-topic"}) + _, err = NewGenericConsumer(transportConfig) if err != nil && !strings.Contains(err.Error(), "client has run out of available brokers") { t.Errorf("failed to generate consumer - %v", err) } diff --git a/pkg/transport/controller/controller.go b/pkg/transport/controller/controller.go new file mode 100644 index 000000000..1eac09d86 --- /dev/null +++ b/pkg/transport/controller/controller.go @@ -0,0 +1,165 @@ +package controller + +import ( + "context" + "fmt" + "reflect" + "sync" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/stolostron/multicluster-global-hub/pkg/transport" + "github.com/stolostron/multicluster-global-hub/pkg/transport/config" + "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer" + "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" + "github.com/stolostron/multicluster-global-hub/pkg/utils" +) + +type TransportCallback func(producer transport.Producer, consumer transport.Consumer) error + +type TransportCtrl struct { + secretNamespace string + secretName string + transportConfig *transport.TransportConfig + + // the use the producer and consumer to activate the call back funciton, once it executed successful, then clear it. + callback TransportCallback + + runtimeClient client.Client + consumer transport.Consumer + producer transport.Producer + extraSecretNames []string + + mutex sync.Mutex +} + +func NewTransportCtrl(namespace, name string, callback TransportCallback, + transportConfig *transport.TransportConfig, +) *TransportCtrl { + return &TransportCtrl{ + secretNamespace: namespace, + secretName: name, + callback: callback, + transportConfig: transportConfig, + extraSecretNames: make([]string, 2), + } +} + +func (c *TransportCtrl) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: c.secretNamespace, + Name: c.secretName, + }, + } + if err := c.runtimeClient.Get(ctx, client.ObjectKeyFromObject(secret), secret); err != nil { + return ctrl.Result{}, err + } + conn, err := config.GetTransportCredentailBySecret(secret, c.runtimeClient) + if err != nil { + return ctrl.Result{}, err + } + + // update the credential secret colletions for the predicates + if conn.CASecretName != "" || !utils.ContainsString(c.extraSecretNames, conn.CASecretName) { + c.extraSecretNames = append(c.extraSecretNames, conn.CASecretName) + } + if conn.ClientSecretName != "" || utils.ContainsString(c.extraSecretNames, conn.ClientSecretName) { + c.extraSecretNames = append(c.extraSecretNames, conn.ClientSecretName) + } + + // if credential not update, then return + if reflect.DeepEqual(c.transportConfig.KafkaCredential, conn) { + return ctrl.Result{}, nil + } + c.transportConfig.KafkaCredential = conn + + // transport config is changed, then create/update the consumer/producer + if c.producer == nil { + sender, err := producer.NewGenericProducer(c.transportConfig) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create/update the producer: %w", err) + } + c.producer = sender + } else { + if err := c.producer.Reconnect(c.transportConfig); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to reconnect the producer: %w", err) + } + } + + if c.consumer == nil { + receiver, err := consumer.NewGenericConsumer(c.transportConfig) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create the consumer: %w", err) + } + c.consumer = receiver + go func() { + if err = c.consumer.Start(ctx); err != nil { + klog.Errorf("failed to start the consumser: %v", err) + } + }() + } else { + if err := c.consumer.Reconnect(ctx, c.transportConfig); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to reconnect the consumer: %w", err) + } + } + klog.Info("the transport secret(producer, consumer) is created/updated") + + if c.callback != nil { + if err := c.callback(c.producer, c.consumer); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to invoke the callback function: %w", err) + } + } + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (c *TransportCtrl) SetupWithManager(mgr ctrl.Manager) error { + c.runtimeClient = mgr.GetClient() + secretPred := predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return c.credentialSecret(e.Object.GetName()) + }, + UpdateFunc: func(e event.UpdateEvent) bool { + if !c.credentialSecret(e.ObjectNew.GetName()) { + return false + } + newSecret := e.ObjectNew.(*corev1.Secret) + oldSecret := e.ObjectOld.(*corev1.Secret) + // only enqueue the obj when secret data changed + return !reflect.DeepEqual(newSecret.Data, oldSecret.Data) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + } + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Secret{}, builder.WithPredicates(secretPred)). + Complete(c) +} + +func (c *TransportCtrl) credentialSecret(name string) bool { + if c.secretName == name { + return true + } + if c.extraSecretNames == nil || len(c.extraSecretNames) == 0 { + return false + } + for _, secretName := range c.extraSecretNames { + if name == secretName { + return true + } + } + return true +} diff --git a/pkg/transport/controller/controller_test.go b/pkg/transport/controller/controller_test.go new file mode 100644 index 000000000..3a9b23e68 --- /dev/null +++ b/pkg/transport/controller/controller_test.go @@ -0,0 +1,86 @@ +package controller + +import ( + "context" + "encoding/base64" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/stolostron/multicluster-global-hub/pkg/transport" +) + +func TestSecretCtrlReconcile(t *testing.T) { + // Set up a fake Kubernetes client + scheme := runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + + callbackInvoked := false + + secretController := &TransportCtrl{ + secretNamespace: "default", + secretName: "test-secret", + transportConfig: &transport.TransportConfig{ + TransportType: string(transport.Chan), + }, + callback: func(p transport.Producer, c transport.Consumer) error { + callbackInvoked = true + return nil + }, + runtimeClient: fakeClient, + } + + ctx := context.TODO() + + kafkaConn := &transport.KafkaConnCredential{ + BootstrapServer: "localhost:3031", + StatusTopic: "event", + SpecTopic: "spec", + ClusterID: "123", + // the following fields are only for the manager, and the agent of byo/standalone kafka + CACert: base64.StdEncoding.EncodeToString([]byte("11")), + ClientCert: base64.StdEncoding.EncodeToString([]byte("12")), + ClientKey: base64.StdEncoding.EncodeToString([]byte("13")), + } + + kafkaConnYaml, err := kafkaConn.YamlMarshal(false) + assert.NoError(t, err) + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-secret", + }, + Data: map[string][]byte{ + "kafka.yaml": kafkaConnYaml, + }, + } + _ = fakeClient.Create(ctx, secret) + + // Reconcile + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "test-secret", + }, + } + result, err := secretController.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, result.Requeue) + assert.NotNil(t, secretController.producer) + assert.NotNil(t, secretController.consumer) + assert.True(t, callbackInvoked) + + // Test when transport config changes + result, err = secretController.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, result.Requeue) +} diff --git a/pkg/transport/producer/generic_producer.go b/pkg/transport/producer/generic_producer.go index 9f7537d2a..62979ca45 100644 --- a/pkg/transport/producer/generic_producer.go +++ b/pkg/transport/producer/generic_producer.go @@ -10,6 +10,7 @@ import ( kafka_confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/protocol" "github.com/cloudevents/sdk-go/v2/protocol/gochan" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/go-logr/logr" @@ -26,54 +27,22 @@ const ( type GenericProducer struct { log logr.Logger + clientPotocol interface{} client cloudevents.Client messageSizeLimit int } -func NewGenericProducer(transportConfig *transport.TransportConfig, defaultTopic string) (*GenericProducer, error) { - var sender interface{} - var err error - messageSize := DefaultMessageKBSize * 1000 - log := ctrl.Log.WithName(fmt.Sprintf("%s-producer", transportConfig.TransportType)) - - switch transportConfig.TransportType { - case string(transport.Kafka): - if transportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB > 0 { - messageSize = transportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB * 1000 - } - kafkaProtocol, err := getConfluentSenderProtocol(transportConfig, defaultTopic) - if err != nil { - return nil, err - } - - eventChan, err := kafkaProtocol.Events() - if err != nil { - return nil, err - } - handleProducerEvents(log, eventChan) - sender = kafkaProtocol - case string(transport.Chan): // this go chan protocol is only use for test - if transportConfig.Extends == nil { - transportConfig.Extends = make(map[string]interface{}) - } - if _, found := transportConfig.Extends[defaultTopic]; !found { - transportConfig.Extends[defaultTopic] = gochan.New() - } - sender = transportConfig.Extends[defaultTopic] - default: - return nil, fmt.Errorf("transport-type - %s is not a valid option", transportConfig.TransportType) +func NewGenericProducer(transportConfig *transport.TransportConfig) (*GenericProducer, error) { + genericProducer := &GenericProducer{ + log: ctrl.Log.WithName(fmt.Sprintf("%s-producer", transportConfig.TransportType)), + messageSizeLimit: DefaultMessageKBSize * 1000, } - - client, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + err := genericProducer.initClient(transportConfig) if err != nil { return nil, err } - return &GenericProducer{ - log: log, - client: client, - messageSizeLimit: messageSize, - }, nil + return genericProducer, nil } func (p *GenericProducer) SendEvent(ctx context.Context, evt cloudevents.Event) error { @@ -108,6 +77,57 @@ func (p *GenericProducer) SendEvent(ctx context.Context, evt cloudevents.Event) return nil } +// Reconnect close the previous producer state and init a new producer +func (p *GenericProducer) Reconnect(config *transport.TransportConfig) error { + closer, ok := p.clientPotocol.(protocol.Closer) + if ok { + if err := closer.Close(context.Background()); err != nil { + return fmt.Errorf("failed to close the previous producer: %w", err) + } + } + return p.initClient(config) +} + +// initClient will init/update the client, clientProtocol and messageLimitSize based on the transportConfig +func (p *GenericProducer) initClient(transportConfig *transport.TransportConfig) error { + topic := transportConfig.KafkaCredential.SpecTopic + if !transportConfig.IsManager { + topic = transportConfig.KafkaCredential.StatusTopic + } + + switch transportConfig.TransportType { + case string(transport.Kafka): + kafkaProtocol, err := getConfluentSenderProtocol(transportConfig.KafkaCredential, topic) + if err != nil { + return err + } + + eventChan, err := kafkaProtocol.Events() + if err != nil { + return err + } + handleProducerEvents(p.log, eventChan) + p.clientPotocol = kafkaProtocol + case string(transport.Chan): // this go chan protocol is only use for test + if transportConfig.Extends == nil { + transportConfig.Extends = make(map[string]interface{}) + } + if _, found := transportConfig.Extends[topic]; !found { + transportConfig.Extends[topic] = gochan.New() + } + p.clientPotocol = transportConfig.Extends[topic] + default: + return fmt.Errorf("transport-type - %s is not a valid option", transportConfig.TransportType) + } + + client, err := cloudevents.NewClient(p.clientPotocol, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + return err + } + p.client = client + return nil +} + func (p *GenericProducer) splitPayloadIntoChunks(payload []byte) [][]byte { var chunk []byte chunks := make([][]byte, 0, len(payload)/(p.messageSizeLimit)+1) @@ -125,15 +145,15 @@ func (p *GenericProducer) SetDataLimit(size int) { p.messageSizeLimit = size } -func getSaramaSenderProtocol(transportConfig *transport.TransportConfig, defaultTopic string) (interface{}, error) { - saramaConfig, err := config.GetSaramaConfig(transportConfig.KafkaConfig) +func getSaramaSenderProtocol(kafkaConfig *transport.KafkaConfig, defaultTopic string) (interface{}, error) { + saramaConfig, err := config.GetSaramaConfig(kafkaConfig) if err != nil { return nil, err } // set max message bytes to 1 MB: 1000 000 > config.ProducerConfig.MessageSizeLimitKB * 1000 saramaConfig.Producer.MaxMessageBytes = MaxMessageKBLimit * 1000 saramaConfig.Producer.Return.Successes = true - sender, err := kafka_sarama.NewSender([]string{transportConfig.KafkaConfig.BootstrapServer}, + sender, err := kafka_sarama.NewSender([]string{kafkaConfig.BootstrapServer}, saramaConfig, defaultTopic) if err != nil { return nil, err @@ -141,10 +161,10 @@ func getSaramaSenderProtocol(transportConfig *transport.TransportConfig, default return sender, nil } -func getConfluentSenderProtocol(transportConfig *transport.TransportConfig, +func getConfluentSenderProtocol(kafkaCredentail *transport.KafkaConnCredential, defaultTopic string, ) (*kafka_confluent.Protocol, error) { - configMap, err := config.GetConfluentConfigMap(transportConfig.KafkaConfig, true) + configMap, err := config.GetConfluentConfigMapByKafkaCredential(kafkaCredentail, "") if err != nil { return nil, err } diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index 3843d92b6..a2e5def89 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -11,12 +11,14 @@ import ( type Producer interface { SendEvent(ctx context.Context, evt cloudevents.Event) error + Reconnect(config *TransportConfig) error } type Consumer interface { // start the transport to consume message Start(ctx context.Context) error EventChan() chan *cloudevents.Event + Reconnect(ctx context.Context, config *TransportConfig) error } // init the transport with different implementation/protocol: secret, strimzi operator or plain deployment diff --git a/pkg/transport/type.go b/pkg/transport/type.go index bdd76d51a..810fbb6cc 100644 --- a/pkg/transport/type.go +++ b/pkg/transport/type.go @@ -2,6 +2,8 @@ package transport import ( "time" + + "sigs.k8s.io/kustomize/kyaml/yaml" ) const ( @@ -38,11 +40,17 @@ const ( ) type TransportConfig struct { - TransportType string - MessageCompressionType string - CommitterInterval time.Duration - KafkaConfig *KafkaConfig - Extends map[string]interface{} + TransportType string + CommitterInterval time.Duration + // IsManager specifies the send/receive topics from specTopic and statusTopic + // For example, SpecTopic sends and statusTopic receives on the manager; the agent is the opposite + IsManager bool + // EnableDatabaseOffset affects only the manager, deciding if consumption starts from a database-stored offset + EnableDatabaseOffset bool + ConsumerGroupId string + // set the credentail in the transport controller + KafkaCredential *KafkaConnCredential + Extends map[string]interface{} } // Kafka Config @@ -89,6 +97,35 @@ type KafkaConnCredential struct { ClientSecretName string `yaml:"client.secret,omitempty"` } +func (k *KafkaConnCredential) YamlMarshal(attachCertSecrets bool) ([]byte, error) { + copy := k.DeepCopy() + if !attachCertSecrets { + copy.CASecretName = "" + copy.ClientSecretName = "" + } else { + copy.CACert = "" + copy.ClientCert = "" + copy.ClientKey = "" + } + bytes, err := yaml.Marshal(copy) + return bytes, err +} + +// DeepCopy creates a deep copy of KafkaConnCredential +func (k *KafkaConnCredential) DeepCopy() *KafkaConnCredential { + return &KafkaConnCredential{ + BootstrapServer: k.BootstrapServer, + StatusTopic: k.StatusTopic, + SpecTopic: k.SpecTopic, + ClusterID: k.ClusterID, + CACert: k.CACert, + ClientCert: k.ClientCert, + ClientKey: k.ClientKey, + CASecretName: k.CASecretName, + ClientSecretName: k.ClientSecretName, + } +} + type EventPosition struct { Topic string `json:"-"` Partition int32 `json:"partition"` diff --git a/test/integration/agent/controller/suite_test.go b/test/integration/agent/controller/suite_test.go index 18e337fbb..d03b09ef0 100644 --- a/test/integration/agent/controller/suite_test.go +++ b/test/integration/agent/controller/suite_test.go @@ -67,7 +67,6 @@ var _ = BeforeSuite(func() { Expect(controllers.AddHubClusterClaimController(mgr)).NotTo(HaveOccurred()) Expect(controllers.AddVersionClusterClaimController(mgr)).NotTo(HaveOccurred()) - Expect(controllers.AddCertController(mgr, kubeClient)).NotTo(HaveOccurred()) go func() { Expect(mgr.Start(ctx)).To(Succeed()) diff --git a/test/integration/agent/spec/clusterlabel_syncer_test.go b/test/integration/agent/spec/clusterlabel_syncer_test.go index c4fc017ab..024a60d73 100644 --- a/test/integration/agent/spec/clusterlabel_syncer_test.go +++ b/test/integration/agent/spec/clusterlabel_syncer_test.go @@ -16,6 +16,7 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/utils" ) +// go test ./test/integration/agent/spec -v -ginkgo.focus "ManagerClusterLabelBundle" var _ = Describe("ManagerClusterLabelBundle", func() { It("sync managedclusterlabel bundle", func() { managedClusterName := "mc1" diff --git a/test/integration/agent/spec/suite_test.go b/test/integration/agent/spec/suite_test.go index 4f2d47a74..ceb92e169 100644 --- a/test/integration/agent/spec/suite_test.go +++ b/test/integration/agent/spec/suite_test.go @@ -18,6 +18,7 @@ import ( "github.com/stolostron/multicluster-global-hub/agent/pkg/config" speccontroller "github.com/stolostron/multicluster-global-hub/agent/pkg/spec/controller" "github.com/stolostron/multicluster-global-hub/pkg/transport" + genericconsumer "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer" genericproducer "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" ) @@ -34,6 +35,7 @@ var ( cancel context.CancelFunc runtimeClient runtimeclient.Client genericProducer transport.Producer + genericConsumer transport.Consumer ) var _ = BeforeSuite(func() { @@ -44,12 +46,12 @@ var _ = BeforeSuite(func() { leafHubName = "spec-hub" agentConfig = &config.AgentConfig{ TransportConfig: &transport.TransportConfig{ - TransportType: string(transport.Chan), - KafkaConfig: &transport.KafkaConfig{ - Topics: &transport.ClusterTopic{ - SpecTopic: "spec", - }, - ConsumerConfig: &transport.KafkaConsumerConfig{}, + TransportType: string(transport.Chan), + IsManager: false, + ConsumerGroupId: "agent", + KafkaCredential: &transport.KafkaConnCredential{ + SpecTopic: "spec", + StatusTopic: "spec", }, }, SpecWorkPoolSize: 2, @@ -74,8 +76,24 @@ var _ = BeforeSuite(func() { Scheme: config.GetRuntimeScheme(), }) Expect(err).NotTo(HaveOccurred()) + runtimeClient = mgr.GetClient() + + agentConfig.TransportConfig.IsManager = false + genericConsumer, err = genericconsumer.NewGenericConsumer(agentConfig.TransportConfig) + Expect(err).NotTo(HaveOccurred()) + + go func() { + if err := genericConsumer.Start(ctx); err != nil { + logf.Log.Error(err, "error to start the chan consumer") + } + }() + Expect(err).NotTo(HaveOccurred()) + + agentConfig.TransportConfig.IsManager = true + genericProducer, err = genericproducer.NewGenericProducer(agentConfig.TransportConfig) + Expect(err).NotTo(HaveOccurred()) - err = speccontroller.AddToManager(mgr, agentConfig) + err = speccontroller.AddToManager(mgr, genericConsumer, agentConfig) Expect(err).NotTo(HaveOccurred()) go func() { @@ -84,10 +102,6 @@ var _ = BeforeSuite(func() { By("Waiting for the manager to be ready") Expect(mgr.GetCache().WaitForCacheSync(ctx)).To(BeTrue()) - - runtimeClient = mgr.GetClient() - genericProducer, err = genericproducer.NewGenericProducer(agentConfig.TransportConfig, "spec") - Expect(err).NotTo(HaveOccurred()) }) var _ = AfterSuite(func() { diff --git a/test/integration/agent/status/suite_test.go b/test/integration/agent/status/suite_test.go index 8be1d6e68..4ea5a67b4 100644 --- a/test/integration/agent/status/suite_test.go +++ b/test/integration/agent/status/suite_test.go @@ -78,11 +78,10 @@ var _ = BeforeSuite(func() { TransportConfig: &transport.TransportConfig{ CommitterInterval: 1 * time.Second, TransportType: string(transport.Chan), - KafkaConfig: &transport.KafkaConfig{ - Topics: &transport.ClusterTopic{ - StatusTopic: "event", - SpecTopic: "spec", - }, + IsManager: false, + KafkaCredential: &transport.KafkaConnCredential{ + SpecTopic: "spec", + StatusTopic: "event", }, }, EnableGlobalResource: true, @@ -221,18 +220,31 @@ func NewChanTransport(mgr ctrl.Manager, transConfig *transport.TransportConfig, consumers: map[string]transport.Consumer{}, producers: map[string]transport.Producer{}, } + for _, topic := range topics { - consumer, err := genericconsumer.NewGenericConsumer(transConfig, []string{topic}) + + // mock the consumer in manager + transConfig.IsManager = true + transConfig.EnableDatabaseOffset = false + transConfig.KafkaCredential.StatusTopic = topic + consumer, err := genericconsumer.NewGenericConsumer(transConfig) if err != nil { return trans, err } - if err = mgr.Add(consumer); err != nil { - return trans, err - } - producer, err := genericproducer.NewGenericProducer(transConfig, topic) + go func() { + if err := consumer.Start(ctx); err != nil { + logf.Log.Error(err, "error to start the chan consumer") + } + }() + Expect(err).NotTo(HaveOccurred()) + + // mock the producer in agent + transConfig.IsManager = false + producer, err := genericproducer.NewGenericProducer(transConfig) if err != nil { return trans, err } + trans.consumers[topic] = consumer trans.producers[topic] = producer } diff --git a/test/integration/manager/controller/hub_management_test.go b/test/integration/manager/controller/hub_management_test.go index fb2d5a006..ead395c9e 100644 --- a/test/integration/manager/controller/hub_management_test.go +++ b/test/integration/manager/controller/hub_management_test.go @@ -13,6 +13,7 @@ import ( "github.com/stolostron/multicluster-global-hub/manager/pkg/hubmanagement" "github.com/stolostron/multicluster-global-hub/pkg/database" "github.com/stolostron/multicluster-global-hub/pkg/database/models" + "github.com/stolostron/multicluster-global-hub/pkg/transport" ) var _ = Describe("hub management", func() { @@ -102,3 +103,7 @@ type tmpProducer struct{} func (p *tmpProducer) SendEvent(ctx context.Context, evt cloudevents.Event) error { return nil } + +func (p *tmpProducer) Reconnect(config *transport.TransportConfig) error { + return nil +} diff --git a/test/integration/manager/spec/suite_test.go b/test/integration/manager/spec/suite_test.go index 05bd181c5..3e69160c9 100644 --- a/test/integration/manager/spec/suite_test.go +++ b/test/integration/manager/spec/suite_test.go @@ -108,6 +108,10 @@ var _ = BeforeSuite(func() { TransportConfig: &transport.TransportConfig{ TransportType: string(transport.Chan), CommitterInterval: 10 * time.Second, + KafkaCredential: &transport.KafkaConnCredential{ + SpecTopic: "spec", + StatusTopic: "event", + }, }, StatisticsConfig: &statistics.StatisticsConfig{}, NonK8sAPIServerConfig: &nonk8sapi.NonK8sAPIServerConfig{}, @@ -115,9 +119,11 @@ var _ = BeforeSuite(func() { } By("Create consumer/producer") - producer, err = genericproducer.NewGenericProducer(managerConfig.TransportConfig, "spec") + managerConfig.TransportConfig.IsManager = true + producer, err = genericproducer.NewGenericProducer(managerConfig.TransportConfig) Expect(err).NotTo(HaveOccurred()) - consumer, err = genericconsumer.NewGenericConsumer(managerConfig.TransportConfig, []string{"spec"}) + managerConfig.TransportConfig.IsManager = false + consumer, err = genericconsumer.NewGenericConsumer(managerConfig.TransportConfig) Expect(err).NotTo(HaveOccurred()) By("Add db to transport") diff --git a/test/integration/manager/status/suite_test.go b/test/integration/manager/status/suite_test.go index 93fd34e1c..1c780aec7 100644 --- a/test/integration/manager/status/suite_test.go +++ b/test/integration/manager/status/suite_test.go @@ -22,6 +22,7 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/database" "github.com/stolostron/multicluster-global-hub/pkg/statistics" "github.com/stolostron/multicluster-global-hub/pkg/transport" + genericconsumer "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer" genericproducer "github.com/stolostron/multicluster-global-hub/pkg/transport/producer" "github.com/stolostron/multicluster-global-hub/test/integration/utils/testpostgres" ) @@ -84,10 +85,9 @@ var _ = BeforeSuite(func() { managerConfig := &config.ManagerConfig{ TransportConfig: &transport.TransportConfig{ TransportType: string(transport.Chan), - KafkaConfig: &transport.KafkaConfig{ - Topics: &transport.ClusterTopic{ - StatusTopic: "event", - }, + KafkaCredential: &transport.KafkaConnCredential{ + SpecTopic: "spec", + StatusTopic: "event", }, }, StatisticsConfig: &statistics.StatisticsConfig{ @@ -97,11 +97,17 @@ var _ = BeforeSuite(func() { } By("Start cloudevents producer and consumer") - producer, err = genericproducer.NewGenericProducer(managerConfig.TransportConfig, "event") + managerConfig.TransportConfig.IsManager = false + producer, err = genericproducer.NewGenericProducer(managerConfig.TransportConfig) Expect(err).NotTo(HaveOccurred()) + managerConfig.TransportConfig.IsManager = true + consumer, err := genericconsumer.NewGenericConsumer(managerConfig.TransportConfig) + Expect(err).NotTo(HaveOccurred()) + Expect(mgr.Add(consumer)).Should(Succeed()) + By("Add controllers to manager") - err = statussyncer.AddStatusSyncers(mgr, managerConfig) + err = statussyncer.AddStatusSyncers(mgr, consumer, managerConfig) Expect(err).ToNot(HaveOccurred()) By("Start the manager") diff --git a/test/manifest/kafka/kafka-cluster/kafka-cluster.yaml b/test/manifest/kafka/kafka-cluster/kafka-cluster.yaml index 804541dba..9ae4c9e06 100644 --- a/test/manifest/kafka/kafka-cluster/kafka-cluster.yaml +++ b/test/manifest/kafka/kafka-cluster/kafka-cluster.yaml @@ -16,7 +16,7 @@ spec: type: simple config: auto.create.topics.enable: "false" - inter.broker.protocol.version: 3.6 + inter.broker.protocol.version: 3.7 offsets.topic.replication.factor: 1 ssl.cipher.suites: TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 ssl.enabled.protocols: TLSv1.2 diff --git a/test/script/e2e_kafka.sh b/test/script/e2e_kafka.sh index 03b28e7d3..65fa3aa92 100755 --- a/test/script/e2e_kafka.sh +++ b/test/script/e2e_kafka.sh @@ -70,6 +70,7 @@ byo_user=global-hub-byo-user wait_cmd "kubectl get kafkauser $byo_user -n $target_namespace | grep -C 1 True" # generate transport secret +wait_cmd "kubectl get kafka kafka -n $target_namespace -o jsonpath='{.status.listeners[1]}' | grep bootstrapServers" bootstrap_server=$(kubectl get kafka kafka -n "$target_namespace" -o jsonpath='{.status.listeners[1].bootstrapServers}') kubectl get kafka kafka -n "$target_namespace" -o jsonpath='{.status.listeners[1].certificates[0]}' >"$CURRENT_DIR"/config/kafka-ca-cert.pem kubectl get secret $byo_user -n "$target_namespace" -o jsonpath='{.data.user\.crt}' | base64 -d >"$CURRENT_DIR"/config/kafka-client-cert.pem diff --git a/test/script/e2e_log.sh b/test/script/e2e_log.sh index c9e44afce..73f2dfff1 100755 --- a/test/script/e2e_log.sh +++ b/test/script/e2e_log.sh @@ -25,10 +25,10 @@ kubectl logs deployment/"$COMPONENT" -n "$NAMESPACE" --all-containers=true [ "$COMPONENT" != "multicluster-global-hub-operator" ] && exit 0 echo ">>>> KafkaCluster" -kubectl get kafka -n "$CONFIG_DIR/hub2" -oyaml +kubectl get kafka -n "$NAMESPACE" --kubeconfig="$CONFIG_DIR/hub2" -oyaml echo ">>>> KafkaUsers" -kubectl get kafkauser -n "$CONFIG_DIR/hub2" -oyaml +kubectl get kafkauser -n "$NAMESPACE" --kubeconfig="$CONFIG_DIR/hub2" -oyaml echo ">>>> KafkaTopics" -kubectl get kafkatopics -n "$CONFIG_DIR/hub2" -oyaml +kubectl get kafkatopics -n "$NAMESPACE" --kubeconfig="$CONFIG_DIR/hub2" -oyaml