Skip to content

Commit

Permalink
✨ Add transport secret for global hub manager and agent (#1054)
Browse files Browse the repository at this point in the history
* Add Secrets

Signed-off-by: myan <[email protected]>

* fix integration

Signed-off-by: myan <[email protected]>

* fix the e2e

Signed-off-by: myan <[email protected]>

* rename the transport secret

Signed-off-by: myan <[email protected]>

* roll back unnessary

Signed-off-by: myan <[email protected]>

* reply review

Signed-off-by: myan <[email protected]>

* review

Signed-off-by: myan <[email protected]>

* fix integration

Signed-off-by: myan <[email protected]>

* fix e2e

Signed-off-by: myan <[email protected]>

* fix sonar

Signed-off-by: myan <[email protected]>

* log the kafka user and topics information

Signed-off-by: myan <[email protected]>

* add kafkauser and topics log

Signed-off-by: myan <[email protected]>

---------

Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa authored Aug 19, 2024
1 parent 3abe3ce commit 6e393b6
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 45 deletions.
6 changes: 3 additions & 3 deletions operator/pkg/config/transport_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
var (
transporterProtocol transport.TransportProtocol
transporterInstance transport.Transporter
transporterConn *transport.ConnCredential
transporterConn *transport.KafkaConnCredential
isBYOKafka = false
specTopic = ""
statusTopic = ""
Expand All @@ -38,11 +38,11 @@ var (
clientCACert []byte
)

func SetTransporterConn(conn *transport.ConnCredential) {
func SetTransporterConn(conn *transport.KafkaConnCredential) {
transporterConn = conn
}

func GetTransporterConn() *transport.ConnCredential {
func GetTransporterConn() *transport.KafkaConnCredential {
return transporterConn
}

Expand Down
12 changes: 12 additions & 0 deletions operator/pkg/controllers/addon/addon_controller_manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kustomize/kyaml/yaml"

globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4"
"github.com/stolostron/multicluster-global-hub/operator/pkg/config"
Expand All @@ -41,6 +42,9 @@ type ManifestsConfig struct {
ImagePullSecretData string
ImagePullPolicy string
LeafHubID string
TransportConfigSecret string
KafkaConfigYaml string
KafkaClusterCASecret string
KafkaBootstrapServer string
TransportType string
KafkaCACert string
Expand Down Expand Up @@ -203,6 +207,11 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster,
return nil, fmt.Errorf("failed to update the kafkauser for the cluster(%s): %v", cluster.Name, err)
}

kafkaConfigYaml, err := yaml.Marshal(kafkaConnection)
if err != nil {
return nil, fmt.Errorf("failed to marshalling the kafka config yaml: %w", err)
}

agentResReq := utils.GetResources(operatorconstants.Agent, mgh.Spec.AdvancedConfig)
agentRes := &Resources{}
jsonData, err := json.Marshal(agentResReq)
Expand All @@ -223,11 +232,14 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster,
HoHAgentImage: image,
ImagePullPolicy: string(imagePullPolicy),
LeafHubID: cluster.Name,
TransportConfigSecret: constants.GHTransportConfigSecret,
KafkaConfigYaml: base64.StdEncoding.EncodeToString(kafkaConfigYaml),
KafkaBootstrapServer: kafkaConnection.BootstrapServer,
KafkaCACert: kafkaConnection.CACert,
KafkaClientCert: kafkaConnection.ClientCert,
KafkaClientKey: kafkaConnection.ClientKey,
KafkaClientCertSecret: certificates.AgentCertificateSecretName(),
KafkaClusterCASecret: kafkaConnection.CASecretName,
KafkaConsumerTopic: clusterTopic.SpecTopic,
KafkaProducerTopic: clusterTopic.StatusTopic,
MessageCompressionType: string(operatorconstants.GzipCompressType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
apiVersion: v1
kind: Secret
metadata:
name: kafka-cluster-ca-cert
name: {{.KafkaClusterCASecret}}
namespace: {{ .AddonInstallNamespace }}
labels:
addon.open-cluster-management.io/hosted-manifest-location: none
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{- if not .InstallHostedMode -}}
apiVersion: v1
kind: Secret
metadata:
name: {{.TransportConfigSecret}}
namespace: {{ .AddonInstallNamespace }}
labels:
addon.open-cluster-management.io/hosted-manifest-location: none
type: Opaque
data:
"kafka.yaml": {{.KafkaConfigYaml}}
{{- end -}}
24 changes: 17 additions & 7 deletions operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/kustomize/kyaml/yaml"

"github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4"
"github.com/stolostron/multicluster-global-hub/operator/pkg/config"
Expand All @@ -31,7 +32,7 @@ var fs embed.FS

var (
storageConnectionCache *config.PostgresConnection
transportConnectionCache *transport.ConnCredential
transportConnectionCache *transport.KafkaConnCredential
)

type ManagerReconciler struct {
Expand Down Expand Up @@ -110,6 +111,11 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context,
return fmt.Errorf("failed to get the electionConfig %w", err)
}

kafkaConfigYaml, err := yaml.Marshal(transportConn)
if err != nil {
return fmt.Errorf("failed to marshall kafka connetion for config: %w", err)
}

managerObjects, err := hohRenderer.Render("manifests", "", func(profile string) (interface{}, error) {
return ManagerVariables{
Image: config.GetImage(config.GlobalHubManagerImageKey),
Expand All @@ -121,13 +127,15 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context,
DatabaseURL: base64.StdEncoding.EncodeToString(
[]byte(storageConn.SuperuserDatabaseURI)),
PostgresCACert: base64.StdEncoding.EncodeToString(storageConn.CACert),
KafkaClusterIdentity: transportConn.Identity,
KafkaCACert: transportConn.CACert,
KafkaClientCert: transportConn.ClientCert,
KafkaClientKey: transportConn.ClientKey,
TransportConfigSecret: constants.GHTransportConfigSecret,
KafkaConfigYaml: base64.StdEncoding.EncodeToString(kafkaConfigYaml),
KafkaClusterIdentity: transportConn.ClusterID,
KafkaBootstrapServer: transportConn.BootstrapServer,
KafkaConsumerTopic: config.ManagerStatusTopic(),
KafkaProducerTopic: config.GetSpecTopic(),
KafkaCACert: transportConn.CACert,
KafkaClientCert: transportConn.ClientCert,
KafkaClientKey: transportConn.ClientKey,
Namespace: mgh.Namespace,
MessageCompressionType: string(operatorconstants.GzipCompressType),
TransportType: string(transport.Kafka),
Expand Down Expand Up @@ -157,7 +165,7 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context,
return nil
}

func isMiddlewareUpdated(transportConn *transport.ConnCredential, storageConn *config.PostgresConnection) bool {
func isMiddlewareUpdated(transportConn *transport.KafkaConnCredential, storageConn *config.PostgresConnection) bool {
updated := false
if transportConnectionCache == nil || storageConnectionCache == nil {
updated = true
Expand All @@ -174,7 +182,7 @@ func isMiddlewareUpdated(transportConn *transport.ConnCredential, storageConn *c
return updated
}

func setMiddlewareCache(transportConn *transport.ConnCredential, storageConn *config.PostgresConnection) {
func setMiddlewareCache(transportConn *transport.KafkaConnCredential, storageConn *config.PostgresConnection) {
if transportConn != nil {
transportConnectionCache = transportConn
}
Expand All @@ -193,6 +201,8 @@ type ManagerVariables struct {
ProxySessionSecret string
DatabaseURL string
PostgresCACert string
TransportConfigSecret string
KafkaConfigYaml string
KafkaClusterIdentity string
KafkaCACert string
KafkaConsumerTopic string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: Secret
metadata:
name: {{.TransportConfigSecret}}
namespace: {{.Namespace}}
labels:
name: multicluster-global-hub-manager
type: Opaque
data:
"kafka.yaml": {{.KafkaConfigYaml}}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *BYOTransporter) Prune(clusterName string) error {
return nil
}

func (s *BYOTransporter) GetConnCredential(username string) (*transport.ConnCredential, error) {
func (s *BYOTransporter) GetConnCredential(clusterName string) (*transport.KafkaConnCredential, error) {
kafkaSecret := &corev1.Secret{}
err := s.runtimeClient.Get(s.ctx, types.NamespacedName{
Name: s.name,
Expand All @@ -62,11 +62,14 @@ func (s *BYOTransporter) GetConnCredential(username string) (*transport.ConnCred
if err != nil {
return nil, err
}
return &transport.ConnCredential{
Identity: string(kafkaSecret.Data[filepath.Join("bootstrap_server")]),
return &transport.KafkaConnCredential{
ClusterID: string(kafkaSecret.Data[filepath.Join("bootstrap_server")]),
BootstrapServer: string(kafkaSecret.Data[filepath.Join("bootstrap_server")]),
CACert: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("ca.crt")]),
ClientCert: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("client.crt")]),
ClientKey: base64.StdEncoding.EncodeToString(kafkaSecret.Data[filepath.Join("client.key")]),
// for the byo case, the status topic isn't change by the clusterName
StatusTopic: config.GetStatusTopic(""),
SpecTopic: config.GetSpecTopic(),
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (r *KafkaController) Reconcile(ctx context.Context, request ctrl.Request) (
config.SetTransporter(trans)

// update the transport connection
conn, err := waitTransportConn(ctx, trans, DefaultGlobalHubKafkaUserName)
conn, err := waitManagerTransportConn(ctx, trans, DefaultGlobalHubKafkaUserName)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -116,20 +116,24 @@ func StartKafkaController(ctx context.Context, mgr ctrl.Manager) (*KafkaControll
return r, nil
}

func waitTransportConn(ctx context.Context, trans *strimziTransporter, kafkaUserSecret string) (
*transport.ConnCredential, error,
func waitManagerTransportConn(ctx context.Context, trans *strimziTransporter, kafkaUserSecret string) (
*transport.KafkaConnCredential, error,
) {
// set transporter connection
var conn *transport.ConnCredential
var conn *transport.KafkaConnCredential
var err error
err = wait.PollUntilContextTimeout(ctx, 2*time.Second, 10*time.Minute, true,
func(ctx context.Context) (bool, error) {
// boostrapServer, clusterId, clusterCA
conn, err = trans.getConnCredentailByCluster()
if err != nil {
klog.Info("waiting the kafka cluster credential to be ready...", "message", err.Error())
return false, err
}

// topics
conn.SpecTopic = config.GetSpecTopic()
conn.StatusTopic = config.ManagerStatusTopic()
// clientCert and clientCA
if err := trans.loadUserCredentail(kafkaUserSecret, conn); err != nil {
klog.Info("waiting the kafka user credential to be ready...", "message", err.Error())
return false, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
operatorv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4"
"github.com/stolostron/multicluster-global-hub/operator/pkg/config"
operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants"
"github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates"
"github.com/stolostron/multicluster-global-hub/operator/pkg/deployer"
"github.com/stolostron/multicluster-global-hub/operator/pkg/renderer"
"github.com/stolostron/multicluster-global-hub/operator/pkg/utils"
Expand Down Expand Up @@ -413,26 +414,39 @@ func (k *strimziTransporter) getClusterTopic(clusterName string) *transport.Clus
}

// the username is the kafkauser, it's the same as the secret name
func (k *strimziTransporter) GetConnCredential(clusterName string) (*transport.ConnCredential, error) {
func (k *strimziTransporter) GetConnCredential(clusterName string) (*transport.KafkaConnCredential, error) {
// bootstrapServer, clusterId, clusterCA
credential, err := k.getConnCredentailByCluster()
if err != nil {
return nil, err
}

userName := config.GetKafkaUserName(clusterName)
if !k.enableTLS {
k.log.Info("the kafka cluster hasn't enable tls for user", "username", userName)
return credential, nil
}
// certificates
credential.CASecretName = GetClusterCASecret(k.kafkaClusterName)
credential.ClientSecretName = certificates.AgentCertificateSecretName()

// topics
credential.StatusTopic = config.GetStatusTopic(clusterName)
credential.SpecTopic = config.GetSpecTopic()

// don't need to load the client cert/key from the kafka user, since it use the external kafkaUser
// userName := config.GetKafkaUserName(clusterName)
// if !k.enableTLS {
// k.log.Info("the kafka cluster hasn't enable tls for user", "username", userName)
// return credential, nil
// }
// if err := k.loadUserCredentail(userName, credential); err != nil {
// return nil, err
// }
return credential, nil
}

func (k *strimziTransporter) loadUserCredentail(kafkaUserName string, credential *transport.ConnCredential) error {
func GetClusterCASecret(clusterName string) string {
return fmt.Sprintf("%s-cluster-ca-cert", clusterName)
}

// loadUserCredentail add credential with client cert, and key
func (k *strimziTransporter) loadUserCredentail(kafkaUserName string, credential *transport.KafkaConnCredential) error {
kafkaUserSecret := &corev1.Secret{}
err := k.runtimeClient.Get(k.ctx, types.NamespacedName{
Name: kafkaUserName,
Expand All @@ -446,7 +460,8 @@ func (k *strimziTransporter) loadUserCredentail(kafkaUserName string, credential
return nil
}

func (k *strimziTransporter) getConnCredentailByCluster() (*transport.ConnCredential, error) {
// getConnCredentailByCluster gets credential with clusterId, bootstrapServer, and serverCA
func (k *strimziTransporter) getConnCredentailByCluster() (*transport.KafkaConnCredential, error) {
kafkaCluster := &kafkav1beta2.Kafka{}
err := k.runtimeClient.Get(k.ctx, types.NamespacedName{
Name: k.kafkaClusterName,
Expand All @@ -466,8 +481,8 @@ func (k *strimziTransporter) getConnCredentailByCluster() (*transport.ConnCreden
if kafkaCluster.Status.ClusterId != nil {
clusterIdentity = *kafkaCluster.Status.ClusterId
}
credential := &transport.ConnCredential{
Identity: clusterIdentity,
credential := &transport.KafkaConnCredential{
ClusterID: clusterIdentity,
BootstrapServer: *kafkaCluster.Status.Listeners[1].BootstrapServers,
CACert: base64.StdEncoding.EncodeToString([]byte(kafkaCluster.Status.Listeners[1].Certificates[0])),
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ const (
PostgresCAConfigMap = "multicluster-global-hub-postgres-ca"
)

// the global hub transport config secret for manager and agent
const (
GHTransportConfigSecret = "transport-config" // #nosec G101
)

// global hub console secret/configmap names
const (
CustomAlertName = "multicluster-global-hub-custom-alerting"
Expand Down
4 changes: 2 additions & 2 deletions pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ type Transporter interface {
// Cleanup will delete the user or topic for the cluster
Prune(clusterName string) error

// get the connection credential by user
GetConnCredential(clusterName string) (*ConnCredential, error)
// get the connection credential by clusterName
GetConnCredential(clusterName string) (*KafkaConnCredential, error)
}
21 changes: 14 additions & 7 deletions pkg/transport/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,20 @@ type ClusterTopic struct {
StatusTopic string
}

// ConnCredential is used to connect the transporter instance
type ConnCredential struct {
Identity string
BootstrapServer string
CACert string
ClientCert string
ClientKey string
// KafkaConnCredential is used to connect the transporter instance. The field is persisted to secret
// need to be encode with base64.StdEncoding.EncodeToString
type KafkaConnCredential struct {
BootstrapServer string `yaml:"bootstrap.server"`
StatusTopic string `yaml:"topic.status,omitempty"`
SpecTopic string `yaml:"topic.spec,omitempty"`
ClusterID string `yaml:"cluster.id,omitempty"`
// the following fields are only for the manager, and the agent of byo/standalone kafka
CACert string `yaml:"ca.key,omitempty"`
ClientCert string `yaml:"client.crt,omitempty"`
ClientKey string `yaml:"client.key,omitempty"`
// the following fields are only for the agent of built-in kafka
CASecretName string `yaml:"ca.secret,omitempty"`
ClientSecretName string `yaml:"client.secret,omitempty"`
}

type EventPosition struct {
Expand Down
6 changes: 3 additions & 3 deletions test/integration/operator/addon/addon_deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ var _ = Describe("addon deploy", func() {
}, work)
}, timeout, interval).ShouldNot(HaveOccurred())

Expect(len(work.Spec.Workload.Manifests)).Should(Equal(9))
Expect(len(work.Spec.Workload.Manifests)).Should(Equal(10))
})

It("Should create HoH agent and ACM when an OCP is imported", func() {
Expand Down Expand Up @@ -147,7 +147,7 @@ var _ = Describe("addon deploy", func() {
}, timeout, interval).ShouldNot(HaveOccurred())

// contains both the ACM and the Global Hub manifests
Expect(len(work.Spec.Workload.Manifests)).Should(Equal(18))
Expect(len(work.Spec.Workload.Manifests)).Should(Equal(19))
})

It("Should create HoH addon when an OCP with deploy mode = default is imported in hosted mode", func() {
Expand Down Expand Up @@ -192,7 +192,7 @@ var _ = Describe("addon deploy", func() {
}, work)
}, timeout, interval).ShouldNot(HaveOccurred())

Expect(len(work.Spec.Workload.Manifests)).Should(Equal(9))
Expect(len(work.Spec.Workload.Manifests)).Should(Equal(10))
})

It("Should create HoH addon when an OCP with deploy mode = Hosted is imported in hosted mode", func() {
Expand Down
Loading

0 comments on commit 6e393b6

Please sign in to comment.