Skip to content

Commit

Permalink
add updated samples (#1058)
Browse files Browse the repository at this point in the history
Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa authored Aug 19, 2024
1 parent 6e393b6 commit 346fb2b
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 11 deletions.
110 changes: 109 additions & 1 deletion pkg/transport/config/confluent_config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package config

import (
"context"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"os"
"path/filepath"

kafkav2 "github.com/confluentinc/confluent-kafka-go/v2/kafka"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kustomize/kyaml/yaml"

"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
Expand All @@ -25,7 +33,7 @@ func GetBasicConfigMap() *kafkav2.ConfigMap {
func SetProducerConfig(kafkaConfigMap *kafkav2.ConfigMap) {
_ = kafkaConfigMap.SetKey("go.produce.channel.size", 1000)
_ = kafkaConfigMap.SetKey("acks", "1")
_ = kafkaConfigMap.SetKey("retries", "0")
_ = kafkaConfigMap.SetKey("retries", "1")
_ = kafkaConfigMap.SetKey("go.events.channel.size", 1000)
}

Expand Down Expand Up @@ -94,3 +102,103 @@ func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig, producer bool) (*
}
return kafkaConfigMap, nil
}

// GetConfluentConfigMapByConfig tries to connect the kafka with transport secret(ca.key, client.crt, client.key)
func GetConfluentConfigMapByConfig(transportConfig *corev1.Secret, c client.Client, consumerGroupID string) (
*kafkav2.ConfigMap, error,
) {
kafkaConfigMap := GetBasicConfigMap()
if consumerGroupID != "" {
SetConsumerConfig(kafkaConfigMap, consumerGroupID)
} else {
SetProducerConfig(kafkaConfigMap)
}
conn, err := GetTransportCredentailBySecret(transportConfig, c)
if err != nil {
return nil, err
}
_ = kafkaConfigMap.SetKey("bootstrap.servers", conn.BootstrapServer)
// if the certs is invalid
if conn.CACert == "" || conn.ClientCert == "" || conn.ClientKey == "" {
klog.Warning("Connect to Kafka without SSL")
return kafkaConfigMap, nil
}

_ = kafkaConfigMap.SetKey("security.protocol", "ssl")
if err := kafkaConfigMap.SetKey("ssl.ca.pem", conn.CACert); err != nil {
return nil, err
}

if err := kafkaConfigMap.SetKey("ssl.certificate.pem", conn.ClientCert); err != nil {
return nil, err
}

if err := kafkaConfigMap.SetKey("ssl.key.pem", conn.ClientKey); err != nil {
return nil, err
}

return kafkaConfigMap, nil
}

func GetTransportCredentailBySecret(transportConfig *corev1.Secret, c client.Client) (
*transport.KafkaConnCredential, error,
) {
kafkaConfig, ok := transportConfig.Data["kafka.yaml"]
if !ok {
return nil, fmt.Errorf("must set the `kafka.yaml` in the transport secret(%s)", transportConfig.Name)
}
conn := &transport.KafkaConnCredential{}
if err := yaml.Unmarshal(kafkaConfig, conn); err != nil {
return nil, fmt.Errorf("failed to unmarshal kafka config to transport credentail: %w", err)
}

// decode the ca and client cert
if conn.CACert != "" {
bytes, err := base64.StdEncoding.DecodeString(conn.CACert)
if err != nil {
return nil, err
}
conn.CACert = string(bytes)
}
if conn.ClientCert != "" {
bytes, err := base64.StdEncoding.DecodeString(conn.ClientCert)
if err != nil {
return nil, err
}
conn.ClientCert = string(bytes)
}
if conn.ClientKey != "" {
bytes, err := base64.StdEncoding.DecodeString(conn.ClientKey)
if err != nil {
return nil, err
}
conn.ClientKey = string(bytes)
}

if conn.CASecretName != "" {
caSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: transportConfig.Namespace,
Name: conn.CASecretName,
},
}
if err := c.Get(context.Background(), client.ObjectKeyFromObject(caSecret), caSecret); err != nil {
return nil, err
}
conn.CACert = string(caSecret.Data["ca.crt"])
}
if conn.ClientSecretName != "" {
clientSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: transportConfig.Namespace,
Name: conn.ClientSecretName,
},
}
if err := c.Get(context.Background(), client.ObjectKeyFromObject(clientSecret), clientSecret); err != nil {
return nil, fmt.Errorf("failed to get the client cert: %w", err)
}
conn.ClientCert = string(clientSecret.Data["tls.crt"])
conn.ClientKey = string(clientSecret.Data["tls.key"])
}
return conn, nil
}
6 changes: 1 addition & 5 deletions samples/cloudevent/confluent-receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
kafka_confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
transconfig "github.com/stolostron/multicluster-global-hub/pkg/transport/config"
"github.com/stolostron/multicluster-global-hub/samples/config"
)

Expand All @@ -26,13 +25,10 @@ func main() {
topic := os.Args[1]
ctx := context.Background()

// export BOOTSTRAP_SERVER and HUB if running it on managed hub
configmap, err := config.GetConfluentConfigMap(false)
configmap, err := config.GetConfluentConfigMapByTranportConfig("", "test-consumer-Id")
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
transconfig.SetConsumerConfig(configmap, "test-group-id")

receiver, err := kafka_confluent.New(kafka_confluent.WithConfigMap(configmap),
kafka_confluent.WithReceiverTopics([]string{topic}))
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions samples/cloudevent/confluent-sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/google/uuid"
transconfig "github.com/stolostron/multicluster-global-hub/pkg/transport/config"
"github.com/stolostron/multicluster-global-hub/samples/config"
)

Expand All @@ -26,13 +25,10 @@ func main() {
topic := os.Args[1]

ctx, cancel := context.WithCancel(context.Background())

configmap, err := config.GetConfluentConfigMap(true)
configmap, err := config.GetConfluentConfigMapByTranportConfig("multicluster-global-hub-agent", "")
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
transconfig.SetProducerConfig(configmap)

sender, err := kafka_confluent.New(kafka_confluent.WithConfigMap(configmap),
kafka_confluent.WithSenderTopic(topic))
if err != nil {
Expand Down
33 changes: 33 additions & 0 deletions samples/config/confluent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
operatorconfig "github.com/stolostron/multicluster-global-hub/operator/pkg/config"
"github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates"
"github.com/stolostron/multicluster-global-hub/pkg/constants"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/config"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -234,3 +236,34 @@ func GetConfluentConfigMapByUser(c client.Client, namespace, clusterName, userNa
}
return nil, fmt.Errorf("kafka cluster %s/%s is not ready", namespace, clusterName)
}

func GetConfluentConfigMapByTranportConfig(namespace, consumerGroupID string) (*kafka.ConfigMap, error) {
kubeconfig, err := DefaultKubeConfig()
if err != nil {
return nil, fmt.Errorf("failed to get kubeconfig")
}
c, err := client.New(kubeconfig, client.Options{Scheme: operatorconfig.GetRuntimeScheme()})
if err != nil {
return nil, fmt.Errorf("failed to get runtime client")
}

if namespace == "" {
namespace = KAFKA_NAMESPACE
}

transportConfig := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: constants.GHTransportConfigSecret,
},
}
err = c.Get(context.Background(), client.ObjectKeyFromObject(transportConfig), transportConfig)
if err != nil {
return nil, err
}
configMap, err := config.GetConfluentConfigMapByConfig(transportConfig, c, consumerGroupID)
if err != nil {
return nil, err
}
return configMap, nil
}

0 comments on commit 346fb2b

Please sign in to comment.