From 346fb2be4688614b90e5ec545234d96acb63a84b Mon Sep 17 00:00:00 2001 From: Meng Yan Date: Mon, 19 Aug 2024 21:32:27 +0800 Subject: [PATCH] add updated samples (#1058) Signed-off-by: myan --- pkg/transport/config/confluent_config.go | 110 +++++++++++++++++- samples/cloudevent/confluent-receiver/main.go | 6 +- samples/cloudevent/confluent-sender/main.go | 6 +- samples/config/confluent_config.go | 33 ++++++ 4 files changed, 144 insertions(+), 11 deletions(-) diff --git a/pkg/transport/config/confluent_config.go b/pkg/transport/config/confluent_config.go index 820f931ee..b2909e7dd 100644 --- a/pkg/transport/config/confluent_config.go +++ b/pkg/transport/config/confluent_config.go @@ -1,12 +1,20 @@ package config import ( + "context" "crypto/x509" + "encoding/base64" "errors" + "fmt" "os" "path/filepath" kafkav2 "github.com/confluentinc/confluent-kafka-go/v2/kafka" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/kustomize/kyaml/yaml" "github.com/stolostron/multicluster-global-hub/pkg/transport" "github.com/stolostron/multicluster-global-hub/pkg/utils" @@ -25,7 +33,7 @@ func GetBasicConfigMap() *kafkav2.ConfigMap { func SetProducerConfig(kafkaConfigMap *kafkav2.ConfigMap) { _ = kafkaConfigMap.SetKey("go.produce.channel.size", 1000) _ = kafkaConfigMap.SetKey("acks", "1") - _ = kafkaConfigMap.SetKey("retries", "0") + _ = kafkaConfigMap.SetKey("retries", "1") _ = kafkaConfigMap.SetKey("go.events.channel.size", 1000) } @@ -94,3 +102,103 @@ func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig, producer bool) (* } return kafkaConfigMap, nil } + +// GetConfluentConfigMapByConfig tries to connect the kafka with transport secret(ca.key, client.crt, client.key) +func GetConfluentConfigMapByConfig(transportConfig *corev1.Secret, c client.Client, consumerGroupID string) ( + *kafkav2.ConfigMap, error, +) { + kafkaConfigMap := GetBasicConfigMap() + if consumerGroupID != "" { + SetConsumerConfig(kafkaConfigMap, consumerGroupID) + } else { + SetProducerConfig(kafkaConfigMap) + } + conn, err := GetTransportCredentailBySecret(transportConfig, c) + if err != nil { + return nil, err + } + _ = kafkaConfigMap.SetKey("bootstrap.servers", conn.BootstrapServer) + // if the certs is invalid + if conn.CACert == "" || conn.ClientCert == "" || conn.ClientKey == "" { + klog.Warning("Connect to Kafka without SSL") + return kafkaConfigMap, nil + } + + _ = kafkaConfigMap.SetKey("security.protocol", "ssl") + if err := kafkaConfigMap.SetKey("ssl.ca.pem", conn.CACert); err != nil { + return nil, err + } + + if err := kafkaConfigMap.SetKey("ssl.certificate.pem", conn.ClientCert); err != nil { + return nil, err + } + + if err := kafkaConfigMap.SetKey("ssl.key.pem", conn.ClientKey); err != nil { + return nil, err + } + + return kafkaConfigMap, nil +} + +func GetTransportCredentailBySecret(transportConfig *corev1.Secret, c client.Client) ( + *transport.KafkaConnCredential, error, +) { + kafkaConfig, ok := transportConfig.Data["kafka.yaml"] + if !ok { + return nil, fmt.Errorf("must set the `kafka.yaml` in the transport secret(%s)", transportConfig.Name) + } + conn := &transport.KafkaConnCredential{} + if err := yaml.Unmarshal(kafkaConfig, conn); err != nil { + return nil, fmt.Errorf("failed to unmarshal kafka config to transport credentail: %w", err) + } + + // decode the ca and client cert + if conn.CACert != "" { + bytes, err := base64.StdEncoding.DecodeString(conn.CACert) + if err != nil { + return nil, err + } + conn.CACert = string(bytes) + } + if conn.ClientCert != "" { + bytes, err := base64.StdEncoding.DecodeString(conn.ClientCert) + if err != nil { + return nil, err + } + conn.ClientCert = string(bytes) + } + if conn.ClientKey != "" { + bytes, err := base64.StdEncoding.DecodeString(conn.ClientKey) + if err != nil { + return nil, err + } + conn.ClientKey = string(bytes) + } + + if conn.CASecretName != "" { + caSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: transportConfig.Namespace, + Name: conn.CASecretName, + }, + } + if err := c.Get(context.Background(), client.ObjectKeyFromObject(caSecret), caSecret); err != nil { + return nil, err + } + conn.CACert = string(caSecret.Data["ca.crt"]) + } + if conn.ClientSecretName != "" { + clientSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: transportConfig.Namespace, + Name: conn.ClientSecretName, + }, + } + if err := c.Get(context.Background(), client.ObjectKeyFromObject(clientSecret), clientSecret); err != nil { + return nil, fmt.Errorf("failed to get the client cert: %w", err) + } + conn.ClientCert = string(clientSecret.Data["tls.crt"]) + conn.ClientKey = string(clientSecret.Data["tls.key"]) + } + return conn, nil +} diff --git a/samples/cloudevent/confluent-receiver/main.go b/samples/cloudevent/confluent-receiver/main.go index c0c5eda55..3e2588cc3 100644 --- a/samples/cloudevent/confluent-receiver/main.go +++ b/samples/cloudevent/confluent-receiver/main.go @@ -10,7 +10,6 @@ import ( kafka_confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/client" - transconfig "github.com/stolostron/multicluster-global-hub/pkg/transport/config" "github.com/stolostron/multicluster-global-hub/samples/config" ) @@ -26,13 +25,10 @@ func main() { topic := os.Args[1] ctx := context.Background() - // export BOOTSTRAP_SERVER and HUB if running it on managed hub - configmap, err := config.GetConfluentConfigMap(false) + configmap, err := config.GetConfluentConfigMapByTranportConfig("", "test-consumer-Id") if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } - transconfig.SetConsumerConfig(configmap, "test-group-id") - receiver, err := kafka_confluent.New(kafka_confluent.WithConfigMap(configmap), kafka_confluent.WithReceiverTopics([]string{topic})) if err != nil { diff --git a/samples/cloudevent/confluent-sender/main.go b/samples/cloudevent/confluent-sender/main.go index 5d4bf5c3a..5d2fe7014 100644 --- a/samples/cloudevent/confluent-sender/main.go +++ b/samples/cloudevent/confluent-sender/main.go @@ -10,7 +10,6 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/google/uuid" - transconfig "github.com/stolostron/multicluster-global-hub/pkg/transport/config" "github.com/stolostron/multicluster-global-hub/samples/config" ) @@ -26,13 +25,10 @@ func main() { topic := os.Args[1] ctx, cancel := context.WithCancel(context.Background()) - - configmap, err := config.GetConfluentConfigMap(true) + configmap, err := config.GetConfluentConfigMapByTranportConfig("multicluster-global-hub-agent", "") if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } - transconfig.SetProducerConfig(configmap) - sender, err := kafka_confluent.New(kafka_confluent.WithConfigMap(configmap), kafka_confluent.WithSenderTopic(topic)) if err != nil { diff --git a/samples/config/confluent_config.go b/samples/config/confluent_config.go index 21c922fb8..66a25c629 100644 --- a/samples/config/confluent_config.go +++ b/samples/config/confluent_config.go @@ -10,9 +10,11 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" operatorconfig "github.com/stolostron/multicluster-global-hub/operator/pkg/config" "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates" + "github.com/stolostron/multicluster-global-hub/pkg/constants" "github.com/stolostron/multicluster-global-hub/pkg/transport" "github.com/stolostron/multicluster-global-hub/pkg/transport/config" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" @@ -234,3 +236,34 @@ func GetConfluentConfigMapByUser(c client.Client, namespace, clusterName, userNa } return nil, fmt.Errorf("kafka cluster %s/%s is not ready", namespace, clusterName) } + +func GetConfluentConfigMapByTranportConfig(namespace, consumerGroupID string) (*kafka.ConfigMap, error) { + kubeconfig, err := DefaultKubeConfig() + if err != nil { + return nil, fmt.Errorf("failed to get kubeconfig") + } + c, err := client.New(kubeconfig, client.Options{Scheme: operatorconfig.GetRuntimeScheme()}) + if err != nil { + return nil, fmt.Errorf("failed to get runtime client") + } + + if namespace == "" { + namespace = KAFKA_NAMESPACE + } + + transportConfig := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: constants.GHTransportConfigSecret, + }, + } + err = c.Get(context.Background(), client.ObjectKeyFromObject(transportConfig), transportConfig) + if err != nil { + return nil, err + } + configMap, err := config.GetConfluentConfigMapByConfig(transportConfig, c, consumerGroupID) + if err != nil { + return nil, err + } + return configMap, nil +}