Skip to content

Commit

Permalink
handle upgrade kafka from zookeeper to kraft
Browse files Browse the repository at this point in the history
Signed-off-by: root <[email protected]>
  • Loading branch information
ldpliu committed Dec 17, 2024
1 parent eeccbd6 commit d7493c6
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 23 deletions.
2 changes: 1 addition & 1 deletion operator/pkg/controllers/inventory/inventory_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (r *InventoryReconciler) Reconcile(ctx context.Context,

transportConn := config.GetTransporterConn()
if transportConn == nil || transportConn.BootstrapServer == "" {
reconcileErr = fmt.Errorf("the transport connection(%s) must not be empty", transportConn)
reconcileErr = fmt.Errorf("the transport connection(%v) must not be empty", transportConn)
return ctrl.Result{}, reconcileErr
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: kafka
name: kraft
labels:
strimzi.io/cluster: {{.KafkaCluster}}
namespace: {{.Namespace}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
Expand Down Expand Up @@ -91,6 +93,7 @@ type strimziTransporter struct {
enableTLS bool
// default is false, to create topic for each managed hub
sharedTopics bool
isNewKafkaCluster bool
topicPartitionReplicas int32
}

Expand Down Expand Up @@ -179,7 +182,15 @@ func WithSubName(name string) KafkaOption {
// EnsureKafka the kafka subscription, cluster, metrics, global hub user and topic
func (k *strimziTransporter) EnsureKafka() (bool, error) {
log.Debug("reconcile global hub kafka transport...")
err := k.ensureSubscription(k.mgh)

// Delete old kafka cluster. Only used handle upgrade from globalhub 1.3->1.4
// TODO: Remove the code in globalhub 1.5
err := k.RemoveOldKafkaCluster(k.mgh.Namespace)
if err != nil {
return true, err
}

err = k.ensureSubscription(k.mgh)
if err != nil {
return true, err
}
Expand Down Expand Up @@ -207,6 +218,39 @@ func (k *strimziTransporter) EnsureKafka() (bool, error) {
return false, nil
}

// RemoveOldKafkaCluster delete old kafka cluster.
// TODO: Should be removed in globalhub 1.5
func (k *strimziTransporter) RemoveOldKafkaCluster(ns string) error {
// Check if kafka resource exist
if _, err := k.manager.GetRESTMapper().KindFor(schema.GroupVersionResource{
Group: "kafka.strimzi.io",
Version: "v1beta2",
Resource: "kafkas",
}); err != nil {
if meta.IsNoMatchError(err) {
return nil
}
return err
}
existingKafka := &kafkav1beta2.Kafka{}
err := k.manager.GetClient().Get(k.ctx, types.NamespacedName{
Name: k.kafkaClusterName,
Namespace: ns,
}, existingKafka)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if existingKafka.Spec.Zookeeper == nil {
return nil
}
log.Infof("delete old kafka cluster")
k.isNewKafkaCluster = true
return k.manager.GetClient().Delete(k.ctx, existingKafka)
}

// renderKafkaMetricsResources renders the kafka podmonitor and metrics, and kafkaUser and kafkaTopic for global hub
func (k *strimziTransporter) renderKafkaResources(mgh *operatorv1alpha4.MulticlusterGlobalHub) error {
statusTopic := config.GetRawStatusTopic()
Expand Down Expand Up @@ -448,7 +492,7 @@ func (k *strimziTransporter) GetConnCredential(clusterName string) (*transport.K
// topics
credential.StatusTopic = config.GetStatusTopic(clusterName)
credential.SpecTopic = config.GetSpecTopic()

credential.IsNewKafkaCluster = k.isNewKafkaCluster
// don't need to load the client cert/key from the kafka user, since it use the external kafkaUser
// userName := config.GetKafkaUserName(clusterName)
// if !k.enableTLS {
Expand Down
2 changes: 2 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ const (
ManagedClusterManagedByAnnotation = "global-hub.open-cluster-management.io/managed-by"
// identify the resource is from the global hub cluster
OriginOwnerReferenceAnnotation = "global-hub.open-cluster-management.io/origin-ownerreference-uid"

AnnotationResyncKafkaClientSecret = "resync-kafka-client-secret"
)

// store all the finalizers
Expand Down
38 changes: 37 additions & 1 deletion pkg/transport/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/stolostron/multicluster-global-hub/pkg/constants"
"github.com/stolostron/multicluster-global-hub/pkg/logger"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/config"
Expand Down Expand Up @@ -223,7 +224,10 @@ func (c *TransportCtrl) ReconcileKafkaCredential(ctx context.Context, secret *co
if err != nil {
return false, err
}

err = c.ResyncKafkaClientSecret(ctx, kafkaConn, secret)
if err != nil {
return false, err
}
// update the wathing secret lits
if kafkaConn.CASecretName != "" || !utils.ContainsString(c.extraSecretNames, kafkaConn.CASecretName) {
c.extraSecretNames = append(c.extraSecretNames, kafkaConn.CASecretName)
Expand All @@ -240,6 +244,38 @@ func (c *TransportCtrl) ReconcileKafkaCredential(ctx context.Context, secret *co
return true, nil
}

func (c *TransportCtrl) ResyncKafkaClientSecret(ctx context.Context, kafkaConn *transport.KafkaConfig, secret *corev1.Secret) error {
log.Debugf("resync kafka client secret: %v", kafkaConn.IsNewKafkaCluster)
if !kafkaConn.IsNewKafkaCluster {
return nil
}
if secret.Annotations != nil {
if _, ok := secret.Annotations[constants.AnnotationResyncKafkaClientSecret]; ok {
return nil
}
}
signedSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: kafkaConn.ClientSecretName,
Namespace: secret.Namespace,
},
}
log.Infof("remove kafka client secret: %v", kafkaConn.ClientSecretName)

err := c.runtimeClient.Delete(ctx, signedSecret)
if err != nil {
return err
}
transportSecret := secret.DeepCopy()
if transportSecret.Annotations == nil {
transportSecret.Annotations = make(map[string]string)
}
log.Infof("update transport config secret")

transportSecret.Annotations[constants.AnnotationResyncKafkaClientSecret] = "true"
return c.runtimeClient.Update(ctx, transportSecret)
}

func (c *TransportCtrl) ReconcileRestfulCredential(ctx context.Context, secret *corev1.Secret) (
updated bool, err error,
) {
Expand Down
38 changes: 20 additions & 18 deletions pkg/transport/kafka_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import "sigs.k8s.io/kustomize/kyaml/yaml"
// KafkaConfig is used to connect the transporter instance. The field is persisted to secret
// need to be encode with base64.StdEncoding.EncodeToString
type KafkaConfig struct {
BootstrapServer string `yaml:"bootstrap.server"`
StatusTopic string `yaml:"topic.status,omitempty"`
SpecTopic string `yaml:"topic.spec,omitempty"`
ClusterID string `yaml:"cluster.id,omitempty"`
CACert string `yaml:"ca.crt,omitempty"`
ClientCert string `yaml:"client.crt,omitempty"`
ClientKey string `yaml:"client.key,omitempty"`
CASecretName string `yaml:"ca.secret,omitempty"`
ClientSecretName string `yaml:"client.secret,omitempty"`
BootstrapServer string `yaml:"bootstrap.server"`
StatusTopic string `yaml:"topic.status,omitempty"`
SpecTopic string `yaml:"topic.spec,omitempty"`
ClusterID string `yaml:"cluster.id,omitempty"`
CACert string `yaml:"ca.crt,omitempty"`
ClientCert string `yaml:"client.crt,omitempty"`
ClientKey string `yaml:"client.key,omitempty"`
CASecretName string `yaml:"ca.secret,omitempty"`
ClientSecretName string `yaml:"client.secret,omitempty"`
IsNewKafkaCluster bool `yaml:"isNewKafkaCluster,omitempty"`
}

// YamlMarshal marshal the connection credential object, rawCert specifies whether to keep the cert in the data directly
Expand All @@ -34,15 +35,16 @@ func (k *KafkaConfig) YamlMarshal(rawCert bool) ([]byte, error) {
// DeepCopy creates a deep copy of KafkaConnCredential
func (k *KafkaConfig) DeepCopy() *KafkaConfig {
return &KafkaConfig{
BootstrapServer: k.BootstrapServer,
StatusTopic: k.StatusTopic,
SpecTopic: k.SpecTopic,
ClusterID: k.ClusterID,
CACert: k.CACert,
ClientCert: k.ClientCert,
ClientKey: k.ClientKey,
CASecretName: k.CASecretName,
ClientSecretName: k.ClientSecretName,
BootstrapServer: k.BootstrapServer,
StatusTopic: k.StatusTopic,
SpecTopic: k.SpecTopic,
ClusterID: k.ClusterID,
CACert: k.CACert,
ClientCert: k.ClientCert,
ClientKey: k.ClientKey,
CASecretName: k.CASecretName,
ClientSecretName: k.ClientSecretName,
IsNewKafkaCluster: k.IsNewKafkaCluster,
}
}

Expand Down

0 comments on commit d7493c6

Please sign in to comment.