diff --git a/operator/pkg/controllers/hubofhubs/globalhub_kafka_controller.go b/operator/pkg/controllers/hubofhubs/globalhub_kafka_controller.go index 1fb7cca0f..db4857a7b 100644 --- a/operator/pkg/controllers/hubofhubs/globalhub_kafka_controller.go +++ b/operator/pkg/controllers/hubofhubs/globalhub_kafka_controller.go @@ -5,14 +5,10 @@ package hubofhubs import ( "context" - "fmt" kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2" "github.com/go-logr/logr" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "k8s.io/client-go/discovery" - "k8s.io/client-go/discovery/cached/memory" - "k8s.io/client-go/restmapper" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/event" @@ -21,8 +17,6 @@ import ( globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" - "github.com/stolostron/multicluster-global-hub/operator/pkg/deployer" - "github.com/stolostron/multicluster-global-hub/operator/pkg/renderer" "github.com/stolostron/multicluster-global-hub/pkg/transport" "github.com/stolostron/multicluster-global-hub/pkg/utils" ) @@ -43,10 +37,7 @@ func (r *KafkaController) Reconcile(ctx context.Context, request ctrl.Request) ( r.Log.Error(err, "failed to get MulticlusterGlobalHub") return ctrl.Result{}, err } - err = r.renderKafkaMetricsResources(mgh) - if err != nil { - return ctrl.Result{}, err - } + r.conn, err = r.globalHubReconciler.ReconcileTransport(ctx, mgh, transport.StrimziTransporter) if err != nil { r.Log.Error(err, "failed to get connection from kafka reconciler") @@ -97,36 +88,6 @@ func startKafkaController(ctx context.Context, mgr ctrl.Manager, return r, nil } -// renderKafkaMetricsResources renders the kafka podmonitor and metrics -func (r *KafkaController) renderKafkaMetricsResources(mgh *globalhubv1alpha4.MulticlusterGlobalHub) error { - if mgh.Spec.EnableMetrics { - // render the kafka objects - kafkaRenderer, kafkaDeployer := renderer.NewHoHRenderer(fs), deployer.NewHoHDeployer(r.mgr.GetClient()) - kafkaObjects, err := kafkaRenderer.Render("manifests/kafka", "", - func(profile string) (interface{}, error) { - return struct { - Namespace string - }{ - Namespace: utils.GetDefaultNamespace(), - }, nil - }) - if err != nil { - return fmt.Errorf("failed to render kafka manifests: %w", err) - } - // create restmapper for deployer to find GVR - dc, err := discovery.NewDiscoveryClientForConfig(r.mgr.GetConfig()) - if err != nil { - return err - } - mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc)) - - if err = manipulateObj(kafkaObjects, mgh, kafkaDeployer, mapper, r.mgr.GetScheme()); err != nil { - return fmt.Errorf("failed to create/update kafka objects: %w", err) - } - } - return nil -} - type kafkaCRDController struct { mgr ctrl.Manager globaHubReconciler *MulticlusterGlobalHubReconciler diff --git a/operator/pkg/controllers/hubofhubs/globalhub_middleware.go b/operator/pkg/controllers/hubofhubs/globalhub_middleware.go index f5ec05336..bebfdee77 100644 --- a/operator/pkg/controllers/hubofhubs/globalhub_middleware.go +++ b/operator/pkg/controllers/hubofhubs/globalhub_middleware.go @@ -27,12 +27,18 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/restmapper" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" + globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" + "github.com/stolostron/multicluster-global-hub/operator/pkg/deployer" "github.com/stolostron/multicluster-global-hub/operator/pkg/postgres" + "github.com/stolostron/multicluster-global-hub/operator/pkg/renderer" transportprotocol "github.com/stolostron/multicluster-global-hub/operator/pkg/transporter" operatorutils "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" @@ -83,6 +89,7 @@ func (r *MulticlusterGlobalHubReconciler) ReconcileMiddleware(ctx context.Contex errorChan <- err return } + r.KafkaInit = true } if r.KafkaController == nil || r.KafkaController.conn == nil { @@ -121,9 +128,14 @@ func (r *MulticlusterGlobalHubReconciler) ReconcileMiddleware(ctx context.Contex func (r *MulticlusterGlobalHubReconciler) ReconcileTransport(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub, transProtocol transport.TransportProtocol, ) (*transport.ConnCredential, error) { + // apply kafka metric resources + err := r.renderKafkaMetricsResources(mgh) + if err != nil { + return nil, err + } + // create the transport instance var trans transport.Transporter - var err error switch transProtocol { case transport.StrimziTransporter: trans, err = transportprotocol.NewStrimziTransporter( @@ -251,3 +263,34 @@ func detectTransportProtocol(ctx context.Context, runtimeClient client.Client) ( // the transport secret is not found return transport.StrimziTransporter, nil } + +// renderKafkaMetricsResources renders the kafka podmonitor and metrics +func (r *MulticlusterGlobalHubReconciler) renderKafkaMetricsResources( + mgh *globalhubv1alpha4.MulticlusterGlobalHub) error { + if mgh.Spec.EnableMetrics { + // render the kafka objects + kafkaRenderer, kafkaDeployer := renderer.NewHoHRenderer(fs), deployer.NewHoHDeployer(r.Client) + kafkaObjects, err := kafkaRenderer.Render("manifests/kafka", "", + func(profile string) (interface{}, error) { + return struct { + Namespace string + }{ + Namespace: utils.GetDefaultNamespace(), + }, nil + }) + if err != nil { + return fmt.Errorf("failed to render kafka manifests: %w", err) + } + // create restmapper for deployer to find GVR + dc, err := discovery.NewDiscoveryClientForConfig(r.Manager.GetConfig()) + if err != nil { + return err + } + mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc)) + + if err = manipulateObj(kafkaObjects, mgh, kafkaDeployer, mapper, r.Manager.GetScheme()); err != nil { + return fmt.Errorf("failed to create/update kafka objects: %w", err) + } + } + return nil +} diff --git a/operator/pkg/controllers/hubofhubs/integration_test.go b/operator/pkg/controllers/hubofhubs/integration_test.go index ff376b2e5..dae5cd440 100644 --- a/operator/pkg/controllers/hubofhubs/integration_test.go +++ b/operator/pkg/controllers/hubofhubs/integration_test.go @@ -924,7 +924,9 @@ var _ = Describe("MulticlusterGlobalHub controller", Ordered, func() { operatorconstants.AnnotationMGHInstallCrunchyOperator: "true", }, }, - Spec: globalhubv1alpha4.MulticlusterGlobalHubSpec{}, + Spec: globalhubv1alpha4.MulticlusterGlobalHubSpec{ + EnableMetrics: true, + }, } Expect(k8sClient.Create(ctx, mcgh)).Should(Succeed()) })