From bc06ee9b39f88c011f86c04617a4f715f330b96f Mon Sep 17 00:00:00 2001 From: myan Date: Tue, 13 Aug 2024 14:05:00 +0000 Subject: [PATCH 01/12] Add Secrets Signed-off-by: myan --- .../addon/addon_controller_manifests.go | 8 ++++++++ ...ter-global-hub-agent-kafka-certs-secret.yaml | 6 ++++++ ...-global-hub-agent-kafka-cluster-ca-cert.yaml | 1 + .../hubofhubs/manager/manager_reconciler.go | 16 +++++++++++++--- .../manifests/manager-transport-secret.yaml | 16 ++++++++++++++++ .../transporter/protocol/strimzi_transporter.go | 17 +++++++++-------- pkg/constants/constants.go | 6 ++++++ pkg/transport/type.go | 3 ++- 8 files changed, 61 insertions(+), 12 deletions(-) create mode 100644 operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml diff --git a/operator/pkg/controllers/addon/addon_controller_manifests.go b/operator/pkg/controllers/addon/addon_controller_manifests.go index 4db16f693..97fd0c4ce 100644 --- a/operator/pkg/controllers/addon/addon_controller_manifests.go +++ b/operator/pkg/controllers/addon/addon_controller_manifests.go @@ -41,7 +41,9 @@ type ManifestsConfig struct { ImagePullSecretData string ImagePullPolicy string LeafHubID string + KafkaAgentSecretName string KafkaBootstrapServer string + KafkaBootstrapServers string TransportType string KafkaCACert string KafkaClientCert string @@ -49,6 +51,8 @@ type ManifestsConfig struct { KafkaClientCertSecret string KafkaConsumerTopic string KafkaProducerTopic string + KafkaSpecTopic string + KafkaStatusTopic string MessageCompressionType string InstallACMHub bool Channel string @@ -223,13 +227,17 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, HoHAgentImage: image, ImagePullPolicy: string(imagePullPolicy), LeafHubID: cluster.Name, + KafkaAgentSecretName: constants.GHAgentTransportSecret, KafkaBootstrapServer: kafkaConnection.BootstrapServer, + KafkaBootstrapServers: base64.StdEncoding.EncodeToString([]byte(kafkaConnection.BootstrapServer)), KafkaCACert: kafkaConnection.CACert, KafkaClientCert: kafkaConnection.ClientCert, KafkaClientKey: kafkaConnection.ClientKey, KafkaClientCertSecret: certificates.AgentCertificateSecretName(), KafkaConsumerTopic: clusterTopic.SpecTopic, + KafkaSpecTopic: base64.StdEncoding.EncodeToString([]byte(clusterTopic.SpecTopic)), KafkaProducerTopic: clusterTopic.StatusTopic, + KafkaStatusTopic: base64.StdEncoding.EncodeToString([]byte(clusterTopic.StatusTopic)), MessageCompressionType: string(operatorconstants.GzipCompressType), TransportType: string(transport.Kafka), LeaseDuration: strconv.Itoa(electionConfig.LeaseDuration), diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml index a223796eb..98a1ca07a 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml @@ -6,8 +6,14 @@ metadata: namespace: {{ .AddonInstallNamespace }} labels: addon.open-cluster-management.io/hosted-manifest-location: none + name: {{.KafkaAgentSecretName}} + client-cert-secret: {{.KafkaClientCertSecret}} + transport-type: {{.TransportType}} type: Opaque data: + "bootstrap_server": {{.KafkaBootstrapServers}} + "status_topic": {{.KafkaStatusTopic}} + "spec_topic": {{.KafkaSpecTopic}} "ca.crt": "{{.KafkaCACert}}" "client.crt": "{{.KafkaClientCert}}" "client.key": "{{.KafkaClientKey}}" diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml index a6b997e76..38711f430 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml @@ -6,6 +6,7 @@ metadata: namespace: {{ .AddonInstallNamespace }} labels: addon.open-cluster-management.io/hosted-manifest-location: none + name: {{.KafkaAgentSecretName}} type: Opaque data: "ca.crt": "{{.KafkaCACert}}" diff --git a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go index 73810b1dd..4ebb403f7 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go @@ -121,13 +121,18 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, DatabaseURL: base64.StdEncoding.EncodeToString( []byte(storageConn.SuperuserDatabaseURI)), PostgresCACert: base64.StdEncoding.EncodeToString(storageConn.CACert), + KafkaManagerSecretName: constants.GHManagerTransportSecret, KafkaClusterIdentity: transportConn.Identity, - KafkaCACert: transportConn.CACert, - KafkaClientCert: transportConn.ClientCert, - KafkaClientKey: transportConn.ClientKey, + KafkaClusterID: base64.StdEncoding.EncodeToString([]byte(transportConn.Identity)), KafkaBootstrapServer: transportConn.BootstrapServer, + KafkaBootstrapServers: base64.StdEncoding.EncodeToString([]byte(transportConn.BootstrapServer)), KafkaConsumerTopic: config.ManagerStatusTopic(), + KafkaStatusTopic: base64.StdEncoding.EncodeToString([]byte(config.ManagerStatusTopic())), KafkaProducerTopic: config.GetSpecTopic(), + KafkaSpecTopic: base64.StdEncoding.EncodeToString([]byte(config.GetSpecTopic())), + KafkaCACert: transportConn.CACert, + KafkaClientCert: transportConn.ClientCert, + KafkaClientKey: transportConn.ClientKey, Namespace: mgh.Namespace, MessageCompressionType: string(operatorconstants.GzipCompressType), TransportType: string(transport.Kafka), @@ -193,13 +198,18 @@ type ManagerVariables struct { ProxySessionSecret string DatabaseURL string PostgresCACert string + KafkaManagerSecretName string KafkaClusterIdentity string + KafkaClusterID string KafkaCACert string KafkaConsumerTopic string KafkaProducerTopic string + KafkaStatusTopic string + KafkaSpecTopic string KafkaClientCert string KafkaClientKey string KafkaBootstrapServer string + KafkaBootstrapServers string MessageCompressionType string TransportType string Namespace string diff --git a/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml b/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml new file mode 100644 index 000000000..144b14fbe --- /dev/null +++ b/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Secret +metadata: + name: {{.ManagerTransportSecret}} + namespace: {{.Namespace}} + labels: + name: multicluster-global-hub-manager +type: Opaque +data: + "cluster_id": {{.KafkaClusterID}} + "bootstrap_server": {{.KafkaBootstrapServers}} + "status_topic": {{.KafkaStatusTopic}} + "spec_topic": {{.KafkaSpecTopic}} + "ca.crt": {{.KafkaCACert}} + "client.crt": {{.KafkaClientCert}} + "client.key": {{.KafkaClientKey}} \ No newline at end of file diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go index 763823950..d57857d4a 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go @@ -419,19 +419,19 @@ func (k *strimziTransporter) GetConnCredential(clusterName string) (*transport.C return nil, err } - userName := config.GetKafkaUserName(clusterName) - if !k.enableTLS { - k.log.Info("the kafka cluster hasn't enable tls for user", "username", userName) - return credential, nil - } - // don't need to load the client cert/key from the kafka user, since it use the external kafkaUser + // userName := config.GetKafkaUserName(clusterName) + // if !k.enableTLS { + // k.log.Info("the kafka cluster hasn't enable tls for user", "username", userName) + // return credential, nil + // } // if err := k.loadUserCredentail(userName, credential); err != nil { // return nil, err // } return credential, nil } +// loadUserCredentail add credential with client cert, and key func (k *strimziTransporter) loadUserCredentail(kafkaUserName string, credential *transport.ConnCredential) error { kafkaUserSecret := &corev1.Secret{} err := k.runtimeClient.Get(k.ctx, types.NamespacedName{ @@ -446,6 +446,7 @@ func (k *strimziTransporter) loadUserCredentail(kafkaUserName string, credential return nil } +// getConnCredentailByCluster gets credential with clusterId, bootstrapServer, and serverCA func (k *strimziTransporter) getConnCredentailByCluster() (*transport.ConnCredential, error) { kafkaCluster := &kafkav1beta2.Kafka{} err := k.runtimeClient.Get(k.ctx, types.NamespacedName{ @@ -467,8 +468,8 @@ func (k *strimziTransporter) getConnCredentailByCluster() (*transport.ConnCreden clusterIdentity = *kafkaCluster.Status.ClusterId } credential := &transport.ConnCredential{ - Identity: clusterIdentity, - BootstrapServer: *kafkaCluster.Status.Listeners[1].BootstrapServers, + Identity: string([]byte(clusterIdentity)), + BootstrapServer: string([]byte(*kafkaCluster.Status.Listeners[1].BootstrapServers)), CACert: base64.StdEncoding.EncodeToString([]byte(kafkaCluster.Status.Listeners[1].Certificates[0])), } return credential, nil diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 660b290aa..efdffeeb0 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -63,6 +63,12 @@ const ( PostgresCAConfigMap = "multicluster-global-hub-postgres-ca" ) +// the global hub transport secret for manager and agent +const ( + GHManagerTransportSecret = "multicluster-global-hub-transport-manager" // #nosec G101 + GHAgentTransportSecret = "multicluster-global-hub-agent" // #nosec G101 +) + // global hub console secret/configmap names const ( CustomAlertName = "multicluster-global-hub-custom-alerting" diff --git a/pkg/transport/type.go b/pkg/transport/type.go index 358bf3f62..6724057a8 100644 --- a/pkg/transport/type.go +++ b/pkg/transport/type.go @@ -76,7 +76,8 @@ type ClusterTopic struct { StatusTopic string } -// ConnCredential is used to connect the transporter instance +// ConnCredential is used to connect the transporter instance. The field is persisted to secret +// need to be encode with base64.StdEncoding.EncodeToString type ConnCredential struct { Identity string BootstrapServer string From 2c085a949d45328ce054d45aa20a7b96b1a8faa2 Mon Sep 17 00:00:00 2001 From: myan Date: Tue, 13 Aug 2024 14:23:47 +0000 Subject: [PATCH 02/12] fix integration Signed-off-by: myan --- .../multicluster-global-hub-agent-kafka-certs-secret.yaml | 1 + .../pkg/controllers/hubofhubs/manager/manager_reconciler.go | 4 ++-- test/integration/operator/hubofhubs/transporter_test.go | 4 +++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml index 98a1ca07a..d5b5fd6fa 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml @@ -8,6 +8,7 @@ metadata: addon.open-cluster-management.io/hosted-manifest-location: none name: {{.KafkaAgentSecretName}} client-cert-secret: {{.KafkaClientCertSecret}} + cluster-ca-secret: kafka-cluster-ca-cert transport-type: {{.TransportType}} type: Opaque data: diff --git a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go index 4ebb403f7..6a5de980a 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go @@ -121,7 +121,7 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, DatabaseURL: base64.StdEncoding.EncodeToString( []byte(storageConn.SuperuserDatabaseURI)), PostgresCACert: base64.StdEncoding.EncodeToString(storageConn.CACert), - KafkaManagerSecretName: constants.GHManagerTransportSecret, + ManagerTransportSecret: constants.GHManagerTransportSecret, KafkaClusterIdentity: transportConn.Identity, KafkaClusterID: base64.StdEncoding.EncodeToString([]byte(transportConn.Identity)), KafkaBootstrapServer: transportConn.BootstrapServer, @@ -198,7 +198,7 @@ type ManagerVariables struct { ProxySessionSecret string DatabaseURL string PostgresCACert string - KafkaManagerSecretName string + ManagerTransportSecret string KafkaClusterIdentity string KafkaClusterID string KafkaCACert string diff --git a/test/integration/operator/hubofhubs/transporter_test.go b/test/integration/operator/hubofhubs/transporter_test.go index d4da21780..c23215d9f 100644 --- a/test/integration/operator/hubofhubs/transporter_test.go +++ b/test/integration/operator/hubofhubs/transporter_test.go @@ -407,7 +407,9 @@ var _ = Describe("transporter", Ordered, func() { // topic: update _, err = trans.EnsureTopic(clusterName) - Expect(err).To(Succeed()) + if !errors.IsAlreadyExists(err) { + Expect(err).To(Succeed()) + } err = trans.Prune(clusterName) Expect(err).To(Succeed()) From d11b91ce0bfb1002d372ca1379273e270fccc2cd Mon Sep 17 00:00:00 2001 From: myan Date: Wed, 14 Aug 2024 01:43:56 +0000 Subject: [PATCH 03/12] fix the e2e Signed-off-by: myan --- .../multicluster-global-hub-agent-kafka-certs-secret.yaml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml index d5b5fd6fa..5c7a36f51 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml @@ -4,12 +4,13 @@ kind: Secret metadata: name: kafka-certs-secret namespace: {{ .AddonInstallNamespace }} + annotations: + transport-type: {{.TransportType}} + client-cert-secret: {{.KafkaClientCertSecret}} + cluster-ca-secret: kafka-cluster-ca-cert labels: addon.open-cluster-management.io/hosted-manifest-location: none name: {{.KafkaAgentSecretName}} - client-cert-secret: {{.KafkaClientCertSecret}} - cluster-ca-secret: kafka-cluster-ca-cert - transport-type: {{.TransportType}} type: Opaque data: "bootstrap_server": {{.KafkaBootstrapServers}} From 932593b0142b8099ee30ac7336b21366b644be84 Mon Sep 17 00:00:00 2001 From: myan Date: Wed, 14 Aug 2024 02:05:16 +0000 Subject: [PATCH 04/12] rename the transport secret Signed-off-by: myan --- pkg/constants/constants.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index efdffeeb0..917186608 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -65,8 +65,8 @@ const ( // the global hub transport secret for manager and agent const ( - GHManagerTransportSecret = "multicluster-global-hub-transport-manager" // #nosec G101 - GHAgentTransportSecret = "multicluster-global-hub-agent" // #nosec G101 + GHManagerTransportSecret = "multicluster-global-hub-manager-transport" // #nosec G101 + GHAgentTransportSecret = "multicluster-global-hub-agent-transport" // #nosec G101 ) // global hub console secret/configmap names From 9e9f1b493d2d497632c2fc9258cbcb6fe58afb09 Mon Sep 17 00:00:00 2001 From: myan Date: Wed, 14 Aug 2024 06:48:06 +0000 Subject: [PATCH 05/12] roll back unnessary Signed-off-by: myan --- .../hubofhubs/transporter/protocol/strimzi_transporter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go index d57857d4a..a9a543a88 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go @@ -468,8 +468,8 @@ func (k *strimziTransporter) getConnCredentailByCluster() (*transport.ConnCreden clusterIdentity = *kafkaCluster.Status.ClusterId } credential := &transport.ConnCredential{ - Identity: string([]byte(clusterIdentity)), - BootstrapServer: string([]byte(*kafkaCluster.Status.Listeners[1].BootstrapServers)), + Identity: clusterIdentity, + BootstrapServer: *kafkaCluster.Status.Listeners[1].BootstrapServers, CACert: base64.StdEncoding.EncodeToString([]byte(kafkaCluster.Status.Listeners[1].Certificates[0])), } return credential, nil From 2719aaf8fcf6f8e2bf5139f75fb6645c83107dcd Mon Sep 17 00:00:00 2001 From: myan Date: Thu, 15 Aug 2024 04:17:28 +0000 Subject: [PATCH 06/12] reply review Signed-off-by: myan --- .../controllers/addon/addon_controller_manifests.go | 3 +++ ...ticluster-global-hub-agent-kafka-certs-secret.yaml | 11 +++++++---- ...luster-global-hub-agent-kafka-cluster-ca-cert.yaml | 2 +- .../manager/manifests/manager-transport-secret.yaml | 8 ++++---- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/operator/pkg/controllers/addon/addon_controller_manifests.go b/operator/pkg/controllers/addon/addon_controller_manifests.go index 97fd0c4ce..6492e9e32 100644 --- a/operator/pkg/controllers/addon/addon_controller_manifests.go +++ b/operator/pkg/controllers/addon/addon_controller_manifests.go @@ -24,6 +24,7 @@ import ( "github.com/stolostron/multicluster-global-hub/operator/pkg/config" operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates" + "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/hubofhubs/transporter/protocol" "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" "github.com/stolostron/multicluster-global-hub/pkg/transport" @@ -49,6 +50,7 @@ type ManifestsConfig struct { KafkaClientCert string KafkaClientKey string KafkaClientCertSecret string + KafkaClusterCASecret string KafkaConsumerTopic string KafkaProducerTopic string KafkaSpecTopic string @@ -234,6 +236,7 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, KafkaClientCert: kafkaConnection.ClientCert, KafkaClientKey: kafkaConnection.ClientKey, KafkaClientCertSecret: certificates.AgentCertificateSecretName(), + KafkaClusterCASecret: fmt.Sprintf("%s-cluster-ca-cert", protocol.KafkaClusterName), KafkaConsumerTopic: clusterTopic.SpecTopic, KafkaSpecTopic: base64.StdEncoding.EncodeToString([]byte(clusterTopic.SpecTopic)), KafkaProducerTopic: clusterTopic.StatusTopic, diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml index 5c7a36f51..7cdf43f5f 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml @@ -7,15 +7,18 @@ metadata: annotations: transport-type: {{.TransportType}} client-cert-secret: {{.KafkaClientCertSecret}} - cluster-ca-secret: kafka-cluster-ca-cert + cluster-ca-secret: {{.KafkaClusterCASecret}} labels: addon.open-cluster-management.io/hosted-manifest-location: none name: {{.KafkaAgentSecretName}} type: Opaque data: - "bootstrap_server": {{.KafkaBootstrapServers}} - "status_topic": {{.KafkaStatusTopic}} - "spec_topic": {{.KafkaSpecTopic}} + "bootstrap.server": {{.KafkaBootstrapServers}} + "topic.status": {{.KafkaStatusTopic}} + "topic.spec": {{.KafkaSpecTopic}} + "ca.crt": {{.KafkaCACert}} + "client.crt": {{.KafkaClientCert}} + "client.key": {{.KafkaClientKey}} "ca.crt": "{{.KafkaCACert}}" "client.crt": "{{.KafkaClientCert}}" "client.key": "{{.KafkaClientKey}}" diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml index 38711f430..cdb945423 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml @@ -2,7 +2,7 @@ apiVersion: v1 kind: Secret metadata: - name: kafka-cluster-ca-cert + name: {{.KafkaClusterCASecret}} namespace: {{ .AddonInstallNamespace }} labels: addon.open-cluster-management.io/hosted-manifest-location: none diff --git a/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml b/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml index 144b14fbe..371e912c2 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml +++ b/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml @@ -7,10 +7,10 @@ metadata: name: multicluster-global-hub-manager type: Opaque data: - "cluster_id": {{.KafkaClusterID}} - "bootstrap_server": {{.KafkaBootstrapServers}} - "status_topic": {{.KafkaStatusTopic}} - "spec_topic": {{.KafkaSpecTopic}} + "cluster.id": {{.KafkaClusterID}} + "bootstrap.server": {{.KafkaBootstrapServers}} + "topic.status": {{.KafkaStatusTopic}} + "topic.spec": {{.KafkaSpecTopic}} "ca.crt": {{.KafkaCACert}} "client.crt": {{.KafkaClientCert}} "client.key": {{.KafkaClientKey}} \ No newline at end of file From 068307337505efa2dd95aafaea1dacbb53a4952c Mon Sep 17 00:00:00 2001 From: myan Date: Thu, 15 Aug 2024 09:52:29 +0000 Subject: [PATCH 07/12] review Signed-off-by: myan --- operator/pkg/config/transport_config.go | 6 ++-- .../addon/addon_controller_manifests.go | 23 +++++++-------- ...r-global-hub-agent-kafka-certs-secret.yaml | 11 -------- ...lobal-hub-agent-kafka-cluster-ca-cert.yaml | 1 - ...bal-hub-agent-transport-config-secret.yaml | 12 ++++++++ .../hubofhubs/manager/manager_reconciler.go | 28 +++++++++---------- .../manifests/manager-transport-secret.yaml | 16 ----------- .../manifests/transport-config-secret.yaml | 10 +++++++ .../transporter/protocol/byo_transporter.go | 9 ++++-- .../protocol/strimzi_kafka_controller.go | 14 ++++++---- .../protocol/strimzi_transporter.go | 24 ++++++++++++---- pkg/constants/constants.go | 5 ++-- pkg/transport/transport.go | 4 +-- pkg/transport/type.go | 20 ++++++++----- 14 files changed, 102 insertions(+), 81 deletions(-) create mode 100644 operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-transport-config-secret.yaml delete mode 100644 operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml create mode 100644 operator/pkg/controllers/hubofhubs/manager/manifests/transport-config-secret.yaml diff --git a/operator/pkg/config/transport_config.go b/operator/pkg/config/transport_config.go index 3e75f5d5d..2b8fd6ab8 100644 --- a/operator/pkg/config/transport_config.go +++ b/operator/pkg/config/transport_config.go @@ -28,7 +28,7 @@ const ( var ( transporterProtocol transport.TransportProtocol transporterInstance transport.Transporter - transporterConn *transport.ConnCredential + transporterConn *transport.KafkaConnCredential isBYOKafka = false specTopic = "" statusTopic = "" @@ -38,11 +38,11 @@ var ( clientCACert []byte ) -func SetTransporterConn(conn *transport.ConnCredential) { +func SetTransporterConn(conn *transport.KafkaConnCredential) { transporterConn = conn } -func GetTransporterConn() *transport.ConnCredential { +func GetTransporterConn() *transport.KafkaConnCredential { return transporterConn } diff --git a/operator/pkg/controllers/addon/addon_controller_manifests.go b/operator/pkg/controllers/addon/addon_controller_manifests.go index 6492e9e32..3e72da930 100644 --- a/operator/pkg/controllers/addon/addon_controller_manifests.go +++ b/operator/pkg/controllers/addon/addon_controller_manifests.go @@ -19,12 +19,12 @@ import ( addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" clusterv1 "open-cluster-management.io/api/cluster/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/yaml" globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates" - "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/hubofhubs/transporter/protocol" "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" "github.com/stolostron/multicluster-global-hub/pkg/transport" @@ -42,19 +42,17 @@ type ManifestsConfig struct { ImagePullSecretData string ImagePullPolicy string LeafHubID string - KafkaAgentSecretName string + TransportConfigSecret string + KafkaConfigYaml string + KafkaClusterCASecret string KafkaBootstrapServer string - KafkaBootstrapServers string TransportType string KafkaCACert string KafkaClientCert string KafkaClientKey string KafkaClientCertSecret string - KafkaClusterCASecret string KafkaConsumerTopic string KafkaProducerTopic string - KafkaSpecTopic string - KafkaStatusTopic string MessageCompressionType string InstallACMHub bool Channel string @@ -209,6 +207,11 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, return nil, fmt.Errorf("failed to update the kafkauser for the cluster(%s): %v", cluster.Name, err) } + kafkaConfigYaml, err := yaml.Marshal(kafkaConnection) + if err != nil { + return nil, fmt.Errorf("failed to marshalling the kafka config yaml: %w", err) + } + agentResReq := utils.GetResources(operatorconstants.Agent, mgh.Spec.AdvancedConfig) agentRes := &Resources{} jsonData, err := json.Marshal(agentResReq) @@ -229,18 +232,16 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, HoHAgentImage: image, ImagePullPolicy: string(imagePullPolicy), LeafHubID: cluster.Name, - KafkaAgentSecretName: constants.GHAgentTransportSecret, + TransportConfigSecret: constants.GHTransportConfigSecret, + KafkaConfigYaml: string(kafkaConfigYaml), KafkaBootstrapServer: kafkaConnection.BootstrapServer, - KafkaBootstrapServers: base64.StdEncoding.EncodeToString([]byte(kafkaConnection.BootstrapServer)), KafkaCACert: kafkaConnection.CACert, KafkaClientCert: kafkaConnection.ClientCert, KafkaClientKey: kafkaConnection.ClientKey, KafkaClientCertSecret: certificates.AgentCertificateSecretName(), - KafkaClusterCASecret: fmt.Sprintf("%s-cluster-ca-cert", protocol.KafkaClusterName), + KafkaClusterCASecret: kafkaConnection.CASecretName, KafkaConsumerTopic: clusterTopic.SpecTopic, - KafkaSpecTopic: base64.StdEncoding.EncodeToString([]byte(clusterTopic.SpecTopic)), KafkaProducerTopic: clusterTopic.StatusTopic, - KafkaStatusTopic: base64.StdEncoding.EncodeToString([]byte(clusterTopic.StatusTopic)), MessageCompressionType: string(operatorconstants.GzipCompressType), TransportType: string(transport.Kafka), LeaseDuration: strconv.Itoa(electionConfig.LeaseDuration), diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml index 7cdf43f5f..a223796eb 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-certs-secret.yaml @@ -4,21 +4,10 @@ kind: Secret metadata: name: kafka-certs-secret namespace: {{ .AddonInstallNamespace }} - annotations: - transport-type: {{.TransportType}} - client-cert-secret: {{.KafkaClientCertSecret}} - cluster-ca-secret: {{.KafkaClusterCASecret}} labels: addon.open-cluster-management.io/hosted-manifest-location: none - name: {{.KafkaAgentSecretName}} type: Opaque data: - "bootstrap.server": {{.KafkaBootstrapServers}} - "topic.status": {{.KafkaStatusTopic}} - "topic.spec": {{.KafkaSpecTopic}} - "ca.crt": {{.KafkaCACert}} - "client.crt": {{.KafkaClientCert}} - "client.key": {{.KafkaClientKey}} "ca.crt": "{{.KafkaCACert}}" "client.crt": "{{.KafkaClientCert}}" "client.key": "{{.KafkaClientKey}}" diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml index cdb945423..0a4920720 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-kafka-cluster-ca-cert.yaml @@ -6,7 +6,6 @@ metadata: namespace: {{ .AddonInstallNamespace }} labels: addon.open-cluster-management.io/hosted-manifest-location: none - name: {{.KafkaAgentSecretName}} type: Opaque data: "ca.crt": "{{.KafkaCACert}}" diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-transport-config-secret.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-transport-config-secret.yaml new file mode 100644 index 000000000..b458eec01 --- /dev/null +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-transport-config-secret.yaml @@ -0,0 +1,12 @@ +{{- if not .InstallHostedMode -}} +apiVersion: v1 +kind: Secret +metadata: + name: {{.TransportConfigSecret}} + namespace: {{ .AddonInstallNamespace }} + labels: + addon.open-cluster-management.io/hosted-manifest-location: none +type: Opaque +data: + "kafka.yaml": "{{.KafkaConfigYaml}}" +{{- end -}} diff --git a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go index 6a5de980a..2424341af 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go @@ -14,6 +14,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/kustomize/kyaml/yaml" "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" @@ -31,7 +32,7 @@ var fs embed.FS var ( storageConnectionCache *config.PostgresConnection - transportConnectionCache *transport.ConnCredential + transportConnectionCache *transport.KafkaConnCredential ) type ManagerReconciler struct { @@ -110,6 +111,11 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, return fmt.Errorf("failed to get the electionConfig %w", err) } + kafkaConfigYaml, err := yaml.Marshal(transportConn) + if err != nil { + return fmt.Errorf("failed to marshall kafka connetion for config: %w", err) + } + managerObjects, err := hohRenderer.Render("manifests", "", func(profile string) (interface{}, error) { return ManagerVariables{ Image: config.GetImage(config.GlobalHubManagerImageKey), @@ -121,15 +127,12 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, DatabaseURL: base64.StdEncoding.EncodeToString( []byte(storageConn.SuperuserDatabaseURI)), PostgresCACert: base64.StdEncoding.EncodeToString(storageConn.CACert), - ManagerTransportSecret: constants.GHManagerTransportSecret, - KafkaClusterIdentity: transportConn.Identity, - KafkaClusterID: base64.StdEncoding.EncodeToString([]byte(transportConn.Identity)), + TransportConfigSecret: constants.GHTransportConfigSecret, + KafkaConfigYaml: string(kafkaConfigYaml), + KafkaClusterIdentity: transportConn.ClusterID, KafkaBootstrapServer: transportConn.BootstrapServer, - KafkaBootstrapServers: base64.StdEncoding.EncodeToString([]byte(transportConn.BootstrapServer)), KafkaConsumerTopic: config.ManagerStatusTopic(), - KafkaStatusTopic: base64.StdEncoding.EncodeToString([]byte(config.ManagerStatusTopic())), KafkaProducerTopic: config.GetSpecTopic(), - KafkaSpecTopic: base64.StdEncoding.EncodeToString([]byte(config.GetSpecTopic())), KafkaCACert: transportConn.CACert, KafkaClientCert: transportConn.ClientCert, KafkaClientKey: transportConn.ClientKey, @@ -162,7 +165,7 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, return nil } -func isMiddlewareUpdated(transportConn *transport.ConnCredential, storageConn *config.PostgresConnection) bool { +func isMiddlewareUpdated(transportConn *transport.KafkaConnCredential, storageConn *config.PostgresConnection) bool { updated := false if transportConnectionCache == nil || storageConnectionCache == nil { updated = true @@ -179,7 +182,7 @@ func isMiddlewareUpdated(transportConn *transport.ConnCredential, storageConn *c return updated } -func setMiddlewareCache(transportConn *transport.ConnCredential, storageConn *config.PostgresConnection) { +func setMiddlewareCache(transportConn *transport.KafkaConnCredential, storageConn *config.PostgresConnection) { if transportConn != nil { transportConnectionCache = transportConn } @@ -198,18 +201,15 @@ type ManagerVariables struct { ProxySessionSecret string DatabaseURL string PostgresCACert string - ManagerTransportSecret string + TransportConfigSecret string + KafkaConfigYaml string KafkaClusterIdentity string - KafkaClusterID string KafkaCACert string KafkaConsumerTopic string KafkaProducerTopic string - KafkaStatusTopic string - KafkaSpecTopic string KafkaClientCert string KafkaClientKey string KafkaBootstrapServer string - KafkaBootstrapServers string MessageCompressionType string TransportType string Namespace string diff --git a/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml b/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml deleted file mode 100644 index 371e912c2..000000000 --- a/operator/pkg/controllers/hubofhubs/manager/manifests/manager-transport-secret.yaml +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: v1 -kind: Secret -metadata: - name: {{.ManagerTransportSecret}} - namespace: {{.Namespace}} - labels: - name: multicluster-global-hub-manager -type: Opaque -data: - "cluster.id": {{.KafkaClusterID}} - "bootstrap.server": {{.KafkaBootstrapServers}} - "topic.status": {{.KafkaStatusTopic}} - "topic.spec": {{.KafkaSpecTopic}} - "ca.crt": {{.KafkaCACert}} - "client.crt": {{.KafkaClientCert}} - "client.key": {{.KafkaClientKey}} \ No newline at end of file diff --git a/operator/pkg/controllers/hubofhubs/manager/manifests/transport-config-secret.yaml b/operator/pkg/controllers/hubofhubs/manager/manifests/transport-config-secret.yaml new file mode 100644 index 000000000..0bb3ec7eb --- /dev/null +++ b/operator/pkg/controllers/hubofhubs/manager/manifests/transport-config-secret.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Secret +metadata: + name: {{.TransportConfigSecret}} + namespace: {{.Namespace}} + labels: + name: multicluster-global-hub-manager +type: Opaque +data: + "kafka.yaml": {{.KafkaConfigYaml}} \ No newline at end of file diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go index 899daf0df..105b6d79f 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go @@ -53,7 +53,7 @@ func (s *BYOTransporter) Prune(clusterName string) error { return nil } -func (s *BYOTransporter) GetConnCredential(username string) (*transport.ConnCredential, error) { +func (s *BYOTransporter) GetConnCredential(clusterName string) (*transport.KafkaConnCredential, error) { kafkaSecret := &corev1.Secret{} err := s.runtimeClient.Get(s.ctx, types.NamespacedName{ Name: s.name, @@ -62,11 +62,14 @@ func (s *BYOTransporter) GetConnCredential(username string) (*transport.ConnCred if err != nil { return nil, err } - return &transport.ConnCredential{ - Identity: string(kafkaSecret.Data[filepath.Join("bootstrap_server")]), + return &transport.KafkaConnCredential{ + ClusterID: string(kafkaSecret.Data[filepath.Join("bootstrap_server")]), BootstrapServer: string(kafkaSecret.Data[filepath.Join("bootstrap_server")]), CACert: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("ca.crt")]), ClientCert: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("client.crt")]), ClientKey: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("client.key")]), + // for the byo case, the status topic isn't change by the clusterName + StatusTopic: config.GetStatusTopic(""), + SpecTopic: config.GetSpecTopic(), }, nil } diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go index b24990b73..f8530f91d 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go @@ -56,7 +56,7 @@ func (r *KafkaController) Reconcile(ctx context.Context, request ctrl.Request) ( config.SetTransporter(trans) // update the transport connection - conn, err := waitTransportConn(ctx, trans, DefaultGlobalHubKafkaUserName) + conn, err := waitManagerTransportConn(ctx, trans, DefaultGlobalHubKafkaUserName) if err != nil { return ctrl.Result{}, err } @@ -116,20 +116,24 @@ func StartKafkaController(ctx context.Context, mgr ctrl.Manager) (*KafkaControll return r, nil } -func waitTransportConn(ctx context.Context, trans *strimziTransporter, kafkaUserSecret string) ( - *transport.ConnCredential, error, +func waitManagerTransportConn(ctx context.Context, trans *strimziTransporter, kafkaUserSecret string) ( + *transport.KafkaConnCredential, error, ) { // set transporter connection - var conn *transport.ConnCredential + var conn *transport.KafkaConnCredential var err error err = wait.PollUntilContextTimeout(ctx, 2*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { + // boostrapServer, clusterId, clusterCA conn, err = trans.getConnCredentailByCluster() if err != nil { klog.Info("waiting the kafka cluster credential to be ready...", "message", err.Error()) return false, err } - + // topics + conn.SpecTopic = config.GetSpecTopic() + conn.StatusTopic = config.ManagerStatusTopic() + // clientCert and clientCA if err := trans.loadUserCredentail(kafkaUserSecret, conn); err != nil { klog.Info("waiting the kafka user credential to be ready...", "message", err.Error()) return false, err diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go index a9a543a88..91f268185 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go @@ -31,6 +31,7 @@ import ( operatorv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" + "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates" "github.com/stolostron/multicluster-global-hub/operator/pkg/deployer" "github.com/stolostron/multicluster-global-hub/operator/pkg/renderer" "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" @@ -413,12 +414,21 @@ func (k *strimziTransporter) getClusterTopic(clusterName string) *transport.Clus } // the username is the kafkauser, it's the same as the secret name -func (k *strimziTransporter) GetConnCredential(clusterName string) (*transport.ConnCredential, error) { +func (k *strimziTransporter) GetConnCredential(clusterName string) (*transport.KafkaConnCredential, error) { + // bootstrapServer, clusterId, clusterCA credential, err := k.getConnCredentailByCluster() if err != nil { return nil, err } + // certificates + credential.CASecretName = GetClusterCASecret(k.kafkaClusterName) + credential.ClientCert = certificates.AgentCertificateSecretName() + + // topics + credential.StatusTopic = config.GetStatusTopic(clusterName) + credential.SpecTopic = config.GetSpecTopic() + // don't need to load the client cert/key from the kafka user, since it use the external kafkaUser // userName := config.GetKafkaUserName(clusterName) // if !k.enableTLS { @@ -431,8 +441,12 @@ func (k *strimziTransporter) GetConnCredential(clusterName string) (*transport.C return credential, nil } +func GetClusterCASecret(clusterName string) string { + return fmt.Sprintf("%s-cluster-ca-cert", clusterName) +} + // loadUserCredentail add credential with client cert, and key -func (k *strimziTransporter) loadUserCredentail(kafkaUserName string, credential *transport.ConnCredential) error { +func (k *strimziTransporter) loadUserCredentail(kafkaUserName string, credential *transport.KafkaConnCredential) error { kafkaUserSecret := &corev1.Secret{} err := k.runtimeClient.Get(k.ctx, types.NamespacedName{ Name: kafkaUserName, @@ -447,7 +461,7 @@ func (k *strimziTransporter) loadUserCredentail(kafkaUserName string, credential } // getConnCredentailByCluster gets credential with clusterId, bootstrapServer, and serverCA -func (k *strimziTransporter) getConnCredentailByCluster() (*transport.ConnCredential, error) { +func (k *strimziTransporter) getConnCredentailByCluster() (*transport.KafkaConnCredential, error) { kafkaCluster := &kafkav1beta2.Kafka{} err := k.runtimeClient.Get(k.ctx, types.NamespacedName{ Name: k.kafkaClusterName, @@ -467,8 +481,8 @@ func (k *strimziTransporter) getConnCredentailByCluster() (*transport.ConnCreden if kafkaCluster.Status.ClusterId != nil { clusterIdentity = *kafkaCluster.Status.ClusterId } - credential := &transport.ConnCredential{ - Identity: clusterIdentity, + credential := &transport.KafkaConnCredential{ + ClusterID: clusterIdentity, BootstrapServer: *kafkaCluster.Status.Listeners[1].BootstrapServers, CACert: base64.StdEncoding.EncodeToString([]byte(kafkaCluster.Status.Listeners[1].Certificates[0])), } diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 917186608..dc2a5862f 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -63,10 +63,9 @@ const ( PostgresCAConfigMap = "multicluster-global-hub-postgres-ca" ) -// the global hub transport secret for manager and agent +// the global hub transport config secret for manager and agent const ( - GHManagerTransportSecret = "multicluster-global-hub-manager-transport" // #nosec G101 - GHAgentTransportSecret = "multicluster-global-hub-agent-transport" // #nosec G101 + GHTransportConfigSecret = "transport-config" // #nosec G101 ) // global hub console secret/configmap names diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index 021f7dd3f..f718ee84c 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -28,6 +28,6 @@ type Transporter interface { // Cleanup will delete the user or topic for the cluster Prune(clusterName string) error - // get the connection credential by user - GetConnCredential(clusterName string) (*ConnCredential, error) + // get the connection credential by clusterName + GetConnCredential(clusterName string) (*KafkaConnCredential, error) } diff --git a/pkg/transport/type.go b/pkg/transport/type.go index 6724057a8..abb0e2d3c 100644 --- a/pkg/transport/type.go +++ b/pkg/transport/type.go @@ -76,14 +76,20 @@ type ClusterTopic struct { StatusTopic string } -// ConnCredential is used to connect the transporter instance. The field is persisted to secret +// KafkaConnCredential is used to connect the transporter instance. The field is persisted to secret // need to be encode with base64.StdEncoding.EncodeToString -type ConnCredential struct { - Identity string - BootstrapServer string - CACert string - ClientCert string - ClientKey string +type KafkaConnCredential struct { + BootstrapServer string `yaml:"bootstrap.server"` + StatusTopic string `yaml:"topic.status,omitempty"` + SpecTopic string `yaml:"topic.spec,omitempty"` + ClusterID string `yaml:"cluster.id,omitempty"` + // the following fields are only for the manager, and the agent of byo/standalone kafka + CACert string `yaml:"ca.key,omitempty"` + ClientCert string `yaml:"client.crt,omitempty"` + ClientKey string `yaml:"client.key,omitempty"` + // the following fields are only for the agent of built-in kafka + CASecretName string `yaml:"ca.secret,omitempty"` + ClientSecretName string `yaml:"client.secret,omitempty"` } type EventPosition struct { From 8b90efe0e97ad69b8f47b81a791d05a041576836 Mon Sep 17 00:00:00 2001 From: myan Date: Thu, 15 Aug 2024 11:16:17 +0000 Subject: [PATCH 08/12] fix integration Signed-off-by: myan --- .../pkg/controllers/addon/addon_controller_manifests.go | 2 +- .../pkg/controllers/hubofhubs/manager/manager_reconciler.go | 2 +- test/integration/operator/addon/addon_deploy_test.go | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/operator/pkg/controllers/addon/addon_controller_manifests.go b/operator/pkg/controllers/addon/addon_controller_manifests.go index 3e72da930..7e6e26781 100644 --- a/operator/pkg/controllers/addon/addon_controller_manifests.go +++ b/operator/pkg/controllers/addon/addon_controller_manifests.go @@ -233,7 +233,7 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, ImagePullPolicy: string(imagePullPolicy), LeafHubID: cluster.Name, TransportConfigSecret: constants.GHTransportConfigSecret, - KafkaConfigYaml: string(kafkaConfigYaml), + KafkaConfigYaml: base64.StdEncoding.EncodeToString(kafkaConfigYaml), KafkaBootstrapServer: kafkaConnection.BootstrapServer, KafkaCACert: kafkaConnection.CACert, KafkaClientCert: kafkaConnection.ClientCert, diff --git a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go index 2424341af..884f17ee3 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go @@ -128,7 +128,7 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, []byte(storageConn.SuperuserDatabaseURI)), PostgresCACert: base64.StdEncoding.EncodeToString(storageConn.CACert), TransportConfigSecret: constants.GHTransportConfigSecret, - KafkaConfigYaml: string(kafkaConfigYaml), + KafkaConfigYaml: base64.StdEncoding.EncodeToString(kafkaConfigYaml), KafkaClusterIdentity: transportConn.ClusterID, KafkaBootstrapServer: transportConn.BootstrapServer, KafkaConsumerTopic: config.ManagerStatusTopic(), diff --git a/test/integration/operator/addon/addon_deploy_test.go b/test/integration/operator/addon/addon_deploy_test.go index ddd73f6d0..749ac8e61 100644 --- a/test/integration/operator/addon/addon_deploy_test.go +++ b/test/integration/operator/addon/addon_deploy_test.go @@ -103,7 +103,7 @@ var _ = Describe("addon deploy", func() { }, work) }, timeout, interval).ShouldNot(HaveOccurred()) - Expect(len(work.Spec.Workload.Manifests)).Should(Equal(9)) + Expect(len(work.Spec.Workload.Manifests)).Should(Equal(10)) }) It("Should create HoH agent and ACM when an OCP is imported", func() { @@ -147,7 +147,7 @@ var _ = Describe("addon deploy", func() { }, timeout, interval).ShouldNot(HaveOccurred()) // contains both the ACM and the Global Hub manifests - Expect(len(work.Spec.Workload.Manifests)).Should(Equal(18)) + Expect(len(work.Spec.Workload.Manifests)).Should(Equal(19)) }) It("Should create HoH addon when an OCP with deploy mode = default is imported in hosted mode", func() { @@ -192,7 +192,7 @@ var _ = Describe("addon deploy", func() { }, work) }, timeout, interval).ShouldNot(HaveOccurred()) - Expect(len(work.Spec.Workload.Manifests)).Should(Equal(9)) + Expect(len(work.Spec.Workload.Manifests)).Should(Equal(10)) }) It("Should create HoH addon when an OCP with deploy mode = Hosted is imported in hosted mode", func() { From fc0b6d9b3c9aa7cfff130cb361be42fc407d1809 Mon Sep 17 00:00:00 2001 From: myan Date: Thu, 15 Aug 2024 14:43:08 +0000 Subject: [PATCH 09/12] fix e2e Signed-off-by: myan --- operator/pkg/controllers/addon/addon_controller_manifests.go | 2 +- .../multicluster-global-hub-agent-transport-config-secret.yaml | 2 +- .../hubofhubs/transporter/protocol/strimzi_transporter.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/operator/pkg/controllers/addon/addon_controller_manifests.go b/operator/pkg/controllers/addon/addon_controller_manifests.go index 7e6e26781..4b0f0b9c4 100644 --- a/operator/pkg/controllers/addon/addon_controller_manifests.go +++ b/operator/pkg/controllers/addon/addon_controller_manifests.go @@ -19,7 +19,7 @@ import ( addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" clusterv1 "open-cluster-management.io/api/cluster/v1" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/yaml" + "sigs.k8s.io/kustomize/kyaml/yaml" globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" diff --git a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-transport-config-secret.yaml b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-transport-config-secret.yaml index b458eec01..9cb0cb487 100644 --- a/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-transport-config-secret.yaml +++ b/operator/pkg/controllers/addon/manifests/templates/agent/multicluster-global-hub-agent-transport-config-secret.yaml @@ -8,5 +8,5 @@ metadata: addon.open-cluster-management.io/hosted-manifest-location: none type: Opaque data: - "kafka.yaml": "{{.KafkaConfigYaml}}" + "kafka.yaml": {{.KafkaConfigYaml}} {{- end -}} diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go index 91f268185..9e65c5279 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go @@ -423,7 +423,7 @@ func (k *strimziTransporter) GetConnCredential(clusterName string) (*transport.K // certificates credential.CASecretName = GetClusterCASecret(k.kafkaClusterName) - credential.ClientCert = certificates.AgentCertificateSecretName() + credential.ClientSecretName = certificates.AgentCertificateSecretName() // topics credential.StatusTopic = config.GetStatusTopic(clusterName) From f3ab8a4dca2dde414b8e1c593967f93fcacc0951 Mon Sep 17 00:00:00 2001 From: myan Date: Fri, 16 Aug 2024 01:09:39 +0000 Subject: [PATCH 10/12] fix sonar Signed-off-by: myan --- .../operator/hubofhubs/transporter_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/test/integration/operator/hubofhubs/transporter_test.go b/test/integration/operator/hubofhubs/transporter_test.go index c23215d9f..adc658430 100644 --- a/test/integration/operator/hubofhubs/transporter_test.go +++ b/test/integration/operator/hubofhubs/transporter_test.go @@ -191,12 +191,20 @@ var _ = Describe("transporter", Ordered, func() { }, 10*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred()) Eventually(func() error { - // the connection is generated + // the connection for manager is generated conn := config.GetTransporterConn() if conn == nil { return fmt.Errorf("the strimzi connection should not be nil") } - utils.PrettyPrint(conn) + // get the conn by transporter + tran := config.GetTransporter() + agentConn, err := tran.GetConnCredential("hub1") + if err != nil { + return err + } + if agentConn == nil { + return fmt.Errorf("the strimzi connection for hub1 should not be nil") + } return nil }, 20*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred()) }) From ccebc0ddf852671521deddaf38f2656a02dc7b7e Mon Sep 17 00:00:00 2001 From: myan Date: Fri, 16 Aug 2024 02:01:37 +0000 Subject: [PATCH 11/12] log the kafka user and topics information Signed-off-by: myan --- test/script/e2e_log.sh | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/test/script/e2e_log.sh b/test/script/e2e_log.sh index 3a546ca9e..62c5febf7 100755 --- a/test/script/e2e_log.sh +++ b/test/script/e2e_log.sh @@ -20,4 +20,12 @@ export KUBECONFIG="${CONFIG_DIR}/${CLUSTER_NAME}" echo ">> COMPONENT=$COMPONENT NAMESPACE=$NAMESPACE CLUSTER=$CLUSTER_NAME" -kubectl logs deployment/"$COMPONENT" -n "$NAMESPACE" --all-containers=true \ No newline at end of file +kubectl logs deployment/"$COMPONENT" -n "$NAMESPACE" --all-containers=true + +[ "$COMPONENT" != "multicluster-global-hub-operator" ] && exit 0 + +echo ">>>> KafkaUsers" +kubectl get kafkauser -oyaml + +echo ">>>> KafkaTopics" +kubectl get kafkatopics -oyaml From 4035268efbc58e3332716d470cb9004b66033bb1 Mon Sep 17 00:00:00 2001 From: myan Date: Fri, 16 Aug 2024 09:15:01 +0000 Subject: [PATCH 12/12] add kafkauser and topics log Signed-off-by: myan --- test/script/e2e_log.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/script/e2e_log.sh b/test/script/e2e_log.sh index 62c5febf7..c41faec1d 100755 --- a/test/script/e2e_log.sh +++ b/test/script/e2e_log.sh @@ -25,7 +25,7 @@ kubectl logs deployment/"$COMPONENT" -n "$NAMESPACE" --all-containers=true [ "$COMPONENT" != "multicluster-global-hub-operator" ] && exit 0 echo ">>>> KafkaUsers" -kubectl get kafkauser -oyaml +kubectl get kafkauser -n "$NAMESPACE" -oyaml echo ">>>> KafkaTopics" -kubectl get kafkatopics -oyaml +kubectl get kafkatopics -n "$NAMESPACE" -oyaml