Skip to content

Commit

Permalink
handle upgrade kafka from zookeeper to kraft
Browse files Browse the repository at this point in the history
Signed-off-by: root <[email protected]>
  • Loading branch information
ldpliu committed Dec 19, 2024
1 parent eeccbd6 commit b36ba67
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 23 deletions.
1 change: 1 addition & 0 deletions doc/reserved_names/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ List all annotations are used by multicluster global hub.
|global-hub.open-cluster-management.io/import-cluster-in-hosted=true\|false | This annotation is used to identify if managedhub cluster should be imported in hosted mode |
| global-hub.open-cluster-management.io/with-inventory | This annotation is used to identify the common inventory is deployed. |
| global-hub.open-cluster-management.io/with-stackrox-integration | This annotation enables the experimental integration with [Stackrox](https://github.com/stackrox).|
| global-hub.open-cluster-management.io/resign-kafka-client-secret | This annotation is used to identify if the kafka client secret is resynced in agent.|

# Finalizer

Expand Down
2 changes: 1 addition & 1 deletion operator/pkg/controllers/inventory/inventory_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (r *InventoryReconciler) Reconcile(ctx context.Context,

transportConn := config.GetTransporterConn()
if transportConn == nil || transportConn.BootstrapServer == "" {
reconcileErr = fmt.Errorf("the transport connection(%s) must not be empty", transportConn)
reconcileErr = fmt.Errorf("the transport connection(%v) must not be empty", transportConn)
return ctrl.Result{}, reconcileErr
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: kafka
name: kraft
labels:
strimzi.io/cluster: {{.KafkaCluster}}
namespace: {{.Namespace}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
Expand Down Expand Up @@ -91,6 +93,7 @@ type strimziTransporter struct {
enableTLS bool
// default is false, to create topic for each managed hub
sharedTopics bool
isNewKafkaCluster bool
topicPartitionReplicas int32
}

Expand Down Expand Up @@ -179,7 +182,15 @@ func WithSubName(name string) KafkaOption {
// EnsureKafka the kafka subscription, cluster, metrics, global hub user and topic
func (k *strimziTransporter) EnsureKafka() (bool, error) {
log.Debug("reconcile global hub kafka transport...")
err := k.ensureSubscription(k.mgh)

// Delete old kafka cluster. Only used handle upgrade from globalhub 1.3->1.4
// TODO: Remove the code in globalhub 1.5
err := k.RemoveOldKafkaCluster(k.mgh.Namespace)
if err != nil {
return true, err
}

err = k.ensureSubscription(k.mgh)
if err != nil {
return true, err
}
Expand Down Expand Up @@ -207,6 +218,39 @@ func (k *strimziTransporter) EnsureKafka() (bool, error) {
return false, nil
}

// RemoveOldKafkaCluster delete old kafka cluster.
// TODO: Should be removed in globalhub 1.5
func (k *strimziTransporter) RemoveOldKafkaCluster(ns string) error {
// Check if kafka resource exist
if _, err := k.manager.GetRESTMapper().KindFor(schema.GroupVersionResource{
Group: "kafka.strimzi.io",
Version: "v1beta2",
Resource: "kafkas",
}); err != nil {
if meta.IsNoMatchError(err) {
return nil
}
return err
}
existingKafka := &kafkav1beta2.Kafka{}
err := k.manager.GetClient().Get(k.ctx, types.NamespacedName{
Name: k.kafkaClusterName,
Namespace: ns,
}, existingKafka)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if existingKafka.Spec.Zookeeper == nil {
return nil
}
log.Infof("delete old kafka cluster")
k.isNewKafkaCluster = true
return k.manager.GetClient().Delete(k.ctx, existingKafka)
}

// renderKafkaMetricsResources renders the kafka podmonitor and metrics, and kafkaUser and kafkaTopic for global hub
func (k *strimziTransporter) renderKafkaResources(mgh *operatorv1alpha4.MulticlusterGlobalHub) error {
statusTopic := config.GetRawStatusTopic()
Expand Down Expand Up @@ -448,7 +492,7 @@ func (k *strimziTransporter) GetConnCredential(clusterName string) (*transport.K
// topics
credential.StatusTopic = config.GetStatusTopic(clusterName)
credential.SpecTopic = config.GetSpecTopic()

credential.IsNewKafkaCluster = k.isNewKafkaCluster
// don't need to load the client cert/key from the kafka user, since it use the external kafkaUser
// userName := config.GetKafkaUserName(clusterName)
// if !k.enableTLS {
Expand Down
3 changes: 3 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ const (
ManagedClusterManagedByAnnotation = "global-hub.open-cluster-management.io/managed-by"
// identify the resource is from the global hub cluster
OriginOwnerReferenceAnnotation = "global-hub.open-cluster-management.io/origin-ownerreference-uid"

// resync the kafka client secret in agent
ResyncKafkaClientSecretAnnotation = "global-hub.open-cluster-management.io/resign-kafka-client-secret" // #nosec G101
)

// store all the finalizers
Expand Down
44 changes: 43 additions & 1 deletion pkg/transport/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/stolostron/multicluster-global-hub/pkg/constants"
"github.com/stolostron/multicluster-global-hub/pkg/logger"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/config"
Expand Down Expand Up @@ -223,7 +224,10 @@ func (c *TransportCtrl) ReconcileKafkaCredential(ctx context.Context, secret *co
if err != nil {
return false, err
}

err = c.ResyncKafkaClientSecret(ctx, kafkaConn, secret)
if err != nil {
return false, err
}
// update the wathing secret lits
if kafkaConn.CASecretName != "" || !utils.ContainsString(c.extraSecretNames, kafkaConn.CASecretName) {
c.extraSecretNames = append(c.extraSecretNames, kafkaConn.CASecretName)
Expand All @@ -240,6 +244,44 @@ func (c *TransportCtrl) ReconcileKafkaCredential(ctx context.Context, secret *co
return true, nil
}

// Resync the kafka client secret because we recreate the kafka cluster.
// TODO: Remove the code in globalhub 1.5
func (c *TransportCtrl) ResyncKafkaClientSecret(ctx context.Context, kafkaConn *transport.KafkaConfig, secret *corev1.Secret) error {
log.Debugf("resync kafka client secret: %v", kafkaConn.IsNewKafkaCluster)
if !kafkaConn.IsNewKafkaCluster {
return nil
}

if secret.Annotations != nil {
if _, ok := secret.Annotations[constants.ResyncKafkaClientSecretAnnotation]; ok {
return nil
}
}
if kafkaConn.ClientSecretName == "" {
return nil
}
signedSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: kafkaConn.ClientSecretName,
Namespace: secret.Namespace,
},
}
log.Infof("remove kafka client secret: %v", kafkaConn.ClientSecretName)

err := c.runtimeClient.Delete(ctx, signedSecret)
if err != nil {
return err
}
transportSecret := secret.DeepCopy()
if transportSecret.Annotations == nil {
transportSecret.Annotations = make(map[string]string)
}
log.Infof("update transport config secret")

transportSecret.Annotations[constants.ResyncKafkaClientSecretAnnotation] = "true"
return c.runtimeClient.Update(ctx, transportSecret)
}

func (c *TransportCtrl) ReconcileRestfulCredential(ctx context.Context, secret *corev1.Secret) (
updated bool, err error,
) {
Expand Down
70 changes: 70 additions & 0 deletions pkg/transport/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/stolostron/multicluster-global-hub/pkg/constants"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)
Expand Down Expand Up @@ -199,3 +201,71 @@ Wf86aX6PepsntZv2GYlA5UpabfT2EZICICpJ5h/iI+i341gBmLiAFQOyTDT+/wQc
-----END CERTIFICATE-----`)

var keyPem = []byte("-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIIrYSSNQFaA2Hwf1duRSxKtLYX5CB04fSeQ6tF1aY/PuoAoGCCqGSM49\nAwEHoUQDQgAEPR3tU2Fta9ktY+6P9G0cWO+0kETA6SFs38GecTyudlHz6xvCdz8q\nEKTcWGekdmdDPsHloRNtsiCa697B2O9IFA==\n-----END EC PRIVATE KEY-----") // notsecret

func TestTransportCtrl_ResyncKafkaClientSecret(t *testing.T) {
tests := []struct {
name string
kafkaConn *transport.KafkaConfig
secret *corev1.Secret
initObjects []runtime.Object
}{
{
name: "no new kafka",
kafkaConn: &transport.KafkaConfig{
IsNewKafkaCluster: false,
},
secret: nil,
},
{
name: "new kafka, has synced",
kafkaConn: &transport.KafkaConfig{
IsNewKafkaCluster: true,
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "transport-config",
Annotations: map[string]string{
constants.ResyncKafkaClientSecretAnnotation: "true",
},
},
},
},
{
name: "new kafka, do not synced",
kafkaConn: &transport.KafkaConfig{
IsNewKafkaCluster: true,
ClientSecretName: "client-secret",
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "transport-config",
},
},
initObjects: []runtime.Object{
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "client-secret",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.secret != nil {
tt.initObjects = append(tt.initObjects, tt.secret)
}
fakeClient := fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(tt.initObjects...).Build()

c := &TransportCtrl{
runtimeClient: fakeClient,
}
if err := c.ResyncKafkaClientSecret(context.Background(), tt.kafkaConn, tt.secret); err != nil {
t.Errorf("TransportCtrl.ResyncKafkaClientSecret() error = %v", err)
}
})
}
}
38 changes: 20 additions & 18 deletions pkg/transport/kafka_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import "sigs.k8s.io/kustomize/kyaml/yaml"
// KafkaConfig is used to connect the transporter instance. The field is persisted to secret
// need to be encode with base64.StdEncoding.EncodeToString
type KafkaConfig struct {
BootstrapServer string `yaml:"bootstrap.server"`
StatusTopic string `yaml:"topic.status,omitempty"`
SpecTopic string `yaml:"topic.spec,omitempty"`
ClusterID string `yaml:"cluster.id,omitempty"`
CACert string `yaml:"ca.crt,omitempty"`
ClientCert string `yaml:"client.crt,omitempty"`
ClientKey string `yaml:"client.key,omitempty"`
CASecretName string `yaml:"ca.secret,omitempty"`
ClientSecretName string `yaml:"client.secret,omitempty"`
BootstrapServer string `yaml:"bootstrap.server"`
StatusTopic string `yaml:"topic.status,omitempty"`
SpecTopic string `yaml:"topic.spec,omitempty"`
ClusterID string `yaml:"cluster.id,omitempty"`
CACert string `yaml:"ca.crt,omitempty"`
ClientCert string `yaml:"client.crt,omitempty"`
ClientKey string `yaml:"client.key,omitempty"`
CASecretName string `yaml:"ca.secret,omitempty"`
ClientSecretName string `yaml:"client.secret,omitempty"`
IsNewKafkaCluster bool `yaml:"isNewKafkaCluster,omitempty"`
}

// YamlMarshal marshal the connection credential object, rawCert specifies whether to keep the cert in the data directly
Expand All @@ -34,15 +35,16 @@ func (k *KafkaConfig) YamlMarshal(rawCert bool) ([]byte, error) {
// DeepCopy creates a deep copy of KafkaConnCredential
func (k *KafkaConfig) DeepCopy() *KafkaConfig {
return &KafkaConfig{
BootstrapServer: k.BootstrapServer,
StatusTopic: k.StatusTopic,
SpecTopic: k.SpecTopic,
ClusterID: k.ClusterID,
CACert: k.CACert,
ClientCert: k.ClientCert,
ClientKey: k.ClientKey,
CASecretName: k.CASecretName,
ClientSecretName: k.ClientSecretName,
BootstrapServer: k.BootstrapServer,
StatusTopic: k.StatusTopic,
SpecTopic: k.SpecTopic,
ClusterID: k.ClusterID,
CACert: k.CACert,
ClientCert: k.ClientCert,
ClientKey: k.ClientKey,
CASecretName: k.CASecretName,
ClientSecretName: k.ClientSecretName,
IsNewKafkaCluster: k.IsNewKafkaCluster,
}
}

Expand Down

0 comments on commit b36ba67

Please sign in to comment.