Skip to content

Commit

Permalink
fix kafka cluster error when enable metric by default
Browse files Browse the repository at this point in the history
Signed-off-by: ldpliu <[email protected]>
  • Loading branch information
ldpliu committed Apr 11, 2024
1 parent 02d6159 commit eb17e42
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 42 deletions.
41 changes: 1 addition & 40 deletions operator/pkg/controllers/hubofhubs/globalhub_kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,10 @@ package hubofhubs

import (
"context"
"fmt"

kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2"
"github.com/go-logr/logr"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/restmapper"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand All @@ -21,8 +17,6 @@ import (

globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4"
"github.com/stolostron/multicluster-global-hub/operator/pkg/config"
"github.com/stolostron/multicluster-global-hub/operator/pkg/deployer"
"github.com/stolostron/multicluster-global-hub/operator/pkg/renderer"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)
Expand All @@ -43,10 +37,7 @@ func (r *KafkaController) Reconcile(ctx context.Context, request ctrl.Request) (
r.Log.Error(err, "failed to get MulticlusterGlobalHub")
return ctrl.Result{}, err
}
err = r.renderKafkaMetricsResources(mgh)
if err != nil {
return ctrl.Result{}, err
}

r.conn, err = r.globalHubReconciler.ReconcileTransport(ctx, mgh, transport.StrimziTransporter)
if err != nil {
r.Log.Error(err, "failed to get connection from kafka reconciler")
Expand Down Expand Up @@ -97,36 +88,6 @@ func startKafkaController(ctx context.Context, mgr ctrl.Manager,
return r, nil
}

// renderKafkaMetricsResources renders the kafka podmonitor and metrics
func (r *KafkaController) renderKafkaMetricsResources(mgh *globalhubv1alpha4.MulticlusterGlobalHub) error {
if mgh.Spec.EnableMetrics {
// render the kafka objects
kafkaRenderer, kafkaDeployer := renderer.NewHoHRenderer(fs), deployer.NewHoHDeployer(r.mgr.GetClient())
kafkaObjects, err := kafkaRenderer.Render("manifests/kafka", "",
func(profile string) (interface{}, error) {
return struct {
Namespace string
}{
Namespace: utils.GetDefaultNamespace(),
}, nil
})
if err != nil {
return fmt.Errorf("failed to render kafka manifests: %w", err)
}
// create restmapper for deployer to find GVR
dc, err := discovery.NewDiscoveryClientForConfig(r.mgr.GetConfig())
if err != nil {
return err
}
mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc))

if err = manipulateObj(kafkaObjects, mgh, kafkaDeployer, mapper, r.mgr.GetScheme()); err != nil {
return fmt.Errorf("failed to create/update kafka objects: %w", err)
}
}
return nil
}

type kafkaCRDController struct {
mgr ctrl.Manager
globaHubReconciler *MulticlusterGlobalHubReconciler
Expand Down
45 changes: 44 additions & 1 deletion operator/pkg/controllers/hubofhubs/globalhub_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/restmapper"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4"
globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4"
"github.com/stolostron/multicluster-global-hub/operator/pkg/config"
"github.com/stolostron/multicluster-global-hub/operator/pkg/deployer"
"github.com/stolostron/multicluster-global-hub/operator/pkg/postgres"
"github.com/stolostron/multicluster-global-hub/operator/pkg/renderer"
transportprotocol "github.com/stolostron/multicluster-global-hub/operator/pkg/transporter"
operatorutils "github.com/stolostron/multicluster-global-hub/operator/pkg/utils"
"github.com/stolostron/multicluster-global-hub/pkg/constants"
Expand Down Expand Up @@ -83,6 +89,7 @@ func (r *MulticlusterGlobalHubReconciler) ReconcileMiddleware(ctx context.Contex
errorChan <- err
return
}

r.KafkaInit = true
}
if r.KafkaController == nil || r.KafkaController.conn == nil {
Expand Down Expand Up @@ -121,9 +128,14 @@ func (r *MulticlusterGlobalHubReconciler) ReconcileMiddleware(ctx context.Contex
func (r *MulticlusterGlobalHubReconciler) ReconcileTransport(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub,
transProtocol transport.TransportProtocol,
) (*transport.ConnCredential, error) {
// apply kafka metric resources
err := r.renderKafkaMetricsResources(mgh)
if err != nil {
return nil, err
}

// create the transport instance
var trans transport.Transporter
var err error
switch transProtocol {
case transport.StrimziTransporter:
trans, err = transportprotocol.NewStrimziTransporter(
Expand Down Expand Up @@ -251,3 +263,34 @@ func detectTransportProtocol(ctx context.Context, runtimeClient client.Client) (
// the transport secret is not found
return transport.StrimziTransporter, nil
}

// renderKafkaMetricsResources renders the kafka podmonitor and metrics
func (r *MulticlusterGlobalHubReconciler) renderKafkaMetricsResources(
mgh *globalhubv1alpha4.MulticlusterGlobalHub) error {
if mgh.Spec.EnableMetrics {
// render the kafka objects
kafkaRenderer, kafkaDeployer := renderer.NewHoHRenderer(fs), deployer.NewHoHDeployer(r.Client)
kafkaObjects, err := kafkaRenderer.Render("manifests/kafka", "",
func(profile string) (interface{}, error) {
return struct {
Namespace string
}{
Namespace: utils.GetDefaultNamespace(),
}, nil
})
if err != nil {
return fmt.Errorf("failed to render kafka manifests: %w", err)
}
// create restmapper for deployer to find GVR
dc, err := discovery.NewDiscoveryClientForConfig(r.Manager.GetConfig())
if err != nil {
return err
}
mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc))

if err = manipulateObj(kafkaObjects, mgh, kafkaDeployer, mapper, r.Manager.GetScheme()); err != nil {
return fmt.Errorf("failed to create/update kafka objects: %w", err)
}
}
return nil
}
4 changes: 3 additions & 1 deletion operator/pkg/controllers/hubofhubs/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,9 @@ var _ = Describe("MulticlusterGlobalHub controller", Ordered, func() {
operatorconstants.AnnotationMGHInstallCrunchyOperator: "true",
},
},
Spec: globalhubv1alpha4.MulticlusterGlobalHubSpec{},
Spec: globalhubv1alpha4.MulticlusterGlobalHubSpec{
EnableMetrics: true,
},
}
Expect(k8sClient.Create(ctx, mcgh)).Should(Succeed())
})
Expand Down

0 comments on commit eb17e42

Please sign in to comment.