Skip to content

Commit

Permalink
✨ Add tranposrt config secret for agent and manager (#1060)
Browse files Browse the repository at this point in the history
* modify the agent and manager

Signed-off-by: myan <[email protected]>

fix the integration

Signed-off-by: myan <[email protected]>

fix the integration test

Signed-off-by: myan <[email protected]>

fix the ut

Signed-off-by: myan <[email protected]>

* remove the mount certs

Signed-off-by: myan <[email protected]>

* add ut

Signed-off-by: myan <[email protected]>

* remove duplicate controller

Signed-off-by: myan <[email protected]>

* fix e2e

Signed-off-by: myan <[email protected]>

* race condition

Signed-off-by: myan <[email protected]>

* wait until the connection is ready

Signed-off-by: myan <[email protected]>

* all broker are down, retrying

Signed-off-by: myan <[email protected]>

* reply review

Signed-off-by: myan <[email protected]>

---------

Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa authored Aug 28, 2024
1 parent 668ea1b commit ebcf16e
Show file tree
Hide file tree
Showing 39 changed files with 725 additions and 410 deletions.
85 changes: 34 additions & 51 deletions agent/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
clusterv1 "open-cluster-management.io/api/cluster/v1"
Expand All @@ -34,7 +33,7 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/jobs"
commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/producer"
"github.com/stolostron/multicluster-global-hub/pkg/transport/controller"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)

Expand Down Expand Up @@ -106,11 +105,11 @@ func parseFlags() *config.AgentConfig {
agentConfig := &config.AgentConfig{
ElectionConfig: &commonobjects.LeaderElectionConfig{},
TransportConfig: &transport.TransportConfig{
KafkaConfig: &transport.KafkaConfig{
Topics: &transport.ClusterTopic{},
ProducerConfig: &transport.KafkaProducerConfig{},
ConsumerConfig: &transport.KafkaConsumerConfig{},
},
// IsManager specifies the send/receive topics from specTopic and statusTopic
// For example, SpecTopic sends and statusTopic receives on the manager; the agent is the opposite
IsManager: false,
// EnableDatabaseOffset affects only the manager, deciding if consumption starts from a database-stored offset
EnableDatabaseOffset: false,
},
}

Expand All @@ -121,24 +120,6 @@ func parseFlags() *config.AgentConfig {
pflag.CommandLine.AddGoFlagSet(defaultFlags)

pflag.StringVar(&agentConfig.LeafHubName, "leaf-hub-name", "", "The name of the leaf hub.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server", "",
"The bootstrap server for kafka.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "",
"The path of CA certificate for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "",
"The path of client certificate for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "",
"The path of client key for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id", "",
"Producer Id for the kafka, default is the leaf hub name.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic, "kafka-producer-topic",
"event", "Topic for the kafka producer.")
pflag.IntVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB,
"kafka-message-size-limit", 940, "The limit for kafka message size in KB.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-consumer-topic",
"spec", "Topic for the kafka consumer.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID, "kafka-consumer-id",
"multicluster-global-hub-agent", "ID for the kafka consumer.")
pflag.StringVar(&agentConfig.PodNameSpace, "pod-namespace", constants.GHAgentNamespace,
"The agent running namespace, also used as leader election namespace")
pflag.StringVar(&agentConfig.TransportConfig.TransportType, "transport-type", "kafka",
Expand All @@ -147,9 +128,6 @@ func parseFlags() *config.AgentConfig {
"The goroutine number to propagate the bundles on managed cluster.")
pflag.BoolVar(&agentConfig.SpecEnforceHohRbac, "enforce-hoh-rbac", false,
"enable hoh RBAC or not, default false")
pflag.StringVar(&agentConfig.TransportConfig.MessageCompressionType,
"transport-message-compression-type", "gzip",
"The message compression type for transport layer, 'gzip' or 'no-op'.")
pflag.IntVar(&agentConfig.StatusDeltaCountSwitchFactor,
"status-delta-count-switch-factor", 100,
"default with 100.")
Expand Down Expand Up @@ -180,19 +158,11 @@ func completeConfig(agentConfig *config.AgentConfig) error {
if agentConfig.LeafHubName == "" {
return fmt.Errorf("flag managed-hub-name can't be empty")
}
if agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID == "" {
agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID = agentConfig.LeafHubName
}
agentConfig.TransportConfig.ConsumerGroupId = agentConfig.LeafHubName
if agentConfig.SpecWorkPoolSize < 1 ||
agentConfig.SpecWorkPoolSize > 100 {
return fmt.Errorf("flag consumer-worker-pool-size should be in the scope [1, 100]")
}

if agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB > producer.MaxMessageKBLimit {
return fmt.Errorf("flag kafka-message-size-limit %d must not exceed %d",
agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB, producer.MaxMessageKBLimit)
}
agentConfig.TransportConfig.KafkaConfig.EnableTLS = true
if agentConfig.MetricsAddress == "" {
agentConfig.MetricsAddress = fmt.Sprintf("%s:%d", metricsHost, metricsPort)
}
Expand Down Expand Up @@ -236,25 +206,38 @@ func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) (
if err != nil {
return nil, fmt.Errorf("failed to create a new manager: %w", err)
}
kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to create kubeclient: %w", err)
}
// Need this controller to update the value of clusterclaim hub.open-cluster-management.io
// we use the value to decide whether install the ACM or not
if err := controllers.AddHubClusterClaimController(mgr); err != nil {
return nil, fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err)
}

if err := controllers.AddCRDController(mgr, restConfig, agentConfig); err != nil {
return nil, fmt.Errorf("failed to add crd controller: %w", err)
err = controller.NewTransportCtrl(
agentConfig.PodNameSpace,
constants.GHTransportConfigSecret,
transportCallback(mgr, agentConfig),
agentConfig.TransportConfig,
).SetupWithManager(mgr)
if err != nil {
return nil, err
}
setupLog.Info("add the transport controller to agent")
return mgr, nil
}

if err := controllers.AddCertController(mgr, kubeClient); err != nil {
return nil, fmt.Errorf("failed to add crd controller: %w", err)
}
// if the transport consumer and producer is ready then the func will be invoked by the transport controller
func transportCallback(mgr ctrl.Manager, agentConfig *config.AgentConfig,
) controller.TransportCallback {
return func(producer transport.Producer, consumer transport.Consumer) error {
// Need this controller to update the value of clusterclaim hub.open-cluster-management.io
// we use the value to decide whether install the ACM or not
if err := controllers.AddHubClusterClaimController(mgr); err != nil {
return fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err)
}

return mgr, nil
if err := controllers.AddCRDController(mgr, mgr.GetConfig(), agentConfig, producer, consumer); err != nil {
return fmt.Errorf("failed to add crd controller: %w", err)
}

setupLog.Info("add the agent controllers to manager")
return nil
}
}

func initCache(config *rest.Config, cacheOpts cache.Options) (cache.Cache, error) {
Expand Down
13 changes: 12 additions & 1 deletion agent/pkg/config/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

var leaseCtrlStarted bool

const (
leaseUpdateJitterFactor = 0.25
defaultLeaseDurationSeconds = 60
Expand All @@ -36,6 +38,10 @@ type leaseUpdater struct {

// AddHoHLeaseUpdater creates a new LeaseUpdater instance aand add it to given manager
func AddHoHLeaseUpdater(mgr ctrl.Manager, addonNamespace, addonName string) error {
if leaseCtrlStarted {
return nil
}

var config *rest.Config
if isAgentTesting, ok := os.LookupEnv("AGENT_TESTING"); ok && isAgentTesting == "true" {
config = mgr.GetConfig()
Expand All @@ -59,7 +65,7 @@ func AddHoHLeaseUpdater(mgr ctrl.Manager, addonNamespace, addonName string) erro
return err
}

return mgr.Add(&leaseUpdater{
err = mgr.Add(&leaseUpdater{
log: ctrl.Log.WithName("multicluster-global-hub-lease-updater"),
client: c,
leaseName: addonName,
Expand All @@ -72,6 +78,11 @@ func AddHoHLeaseUpdater(mgr ctrl.Manager, addonNamespace, addonName string) erro
"name=multicluster-global-hub-agent"),
},
})
if err != nil {
return err
}
leaseCtrlStarted = true
return nil
}

// Start starts a goroutine to update lease to implement controller-runtime Runnable interface
Expand Down
63 changes: 0 additions & 63 deletions agent/pkg/controllers/cert_controller.go

This file was deleted.

26 changes: 21 additions & 5 deletions agent/pkg/controllers/crd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,31 @@ import (
"github.com/stolostron/multicluster-global-hub/agent/pkg/config"
specController "github.com/stolostron/multicluster-global-hub/agent/pkg/spec/controller"
statusController "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
)

var crdCtrlStarted = false

type crdController struct {
mgr ctrl.Manager
log logr.Logger
restConfig *rest.Config
agentConfig *config.AgentConfig
producer transport.Producer
consumer transport.Consumer
}

func (c *crdController) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
reqLogger := c.log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.V(2).Info("crd controller", "NamespacedName:", request.NamespacedName)

// add spec controllers
if err := specController.AddToManager(c.mgr, c.agentConfig); err != nil {
if err := specController.AddToManager(c.mgr, c.consumer, c.agentConfig); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to add spec syncer: %w", err)
}
reqLogger.V(2).Info("add spec controllers to manager")

if err := statusController.AddControllers(ctx, c.mgr, c.agentConfig); err != nil {
if err := statusController.AddControllers(ctx, c.mgr, c.producer, c.agentConfig); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to add status syncer: %w", err)
}

Expand All @@ -56,8 +61,13 @@ func (c *crdController) Reconcile(ctx context.Context, request ctrl.Request) (ct

// this controller is used to watch the multiclusterhub crd or clustermanager crd
// if the crd exists, then add controllers to the manager dynamically
func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *config.AgentConfig) error {
return ctrl.NewControllerManagedBy(mgr).
func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *config.AgentConfig,
producer transport.Producer, consumer transport.Consumer,
) error {
if crdCtrlStarted {
return nil
}
if err := ctrl.NewControllerManagedBy(mgr).
For(&apiextensionsv1.CustomResourceDefinition{}, builder.WithPredicates(predicate.Funcs{
// trigger the reconciler only if the crd is created
CreateFunc: func(e event.CreateEvent) bool {
Expand All @@ -77,6 +87,12 @@ func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *co
mgr: mgr,
restConfig: restConfig,
agentConfig: agentConfig,
producer: producer,
consumer: consumer,
log: ctrl.Log.WithName("crd-controller"),
})
}); err != nil {
return err
}
crdCtrlStarted = true
return nil
}
12 changes: 11 additions & 1 deletion agent/pkg/controllers/hub_clusterclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)

var clusterClaimCtrlStared = false

type hubClusterClaimController struct {
client client.Client
log logr.Logger
Expand All @@ -40,19 +42,27 @@ func (c *hubClusterClaimController) Reconcile(ctx context.Context, request ctrl.
func AddHubClusterClaimController(mgr ctrl.Manager) error {
// the controller is only to trigger create hub clusterClaim at the beginning
// do nothing if the hub clusterClaim existed
if clusterClaimCtrlStared {
return nil
}
clusterClaimPredicate := predicate.NewPredicateFuncs(func(object client.Object) bool {
if object.GetName() == constants.HubClusterClaimName {
return false
}
clusterClaim, _ := getClusterClaim(context.Background(), mgr.GetClient(), constants.HubClusterClaimName)
return clusterClaim == nil
})
return ctrl.NewControllerManagedBy(mgr).Named("hubclusterclaim-controller").
err := ctrl.NewControllerManagedBy(mgr).Named("hubclusterclaim-controller").
For(&clustersv1alpha1.ClusterClaim{}, builder.WithPredicates(clusterClaimPredicate)).
Complete(&hubClusterClaimController{
client: mgr.GetClient(),
log: ctrl.Log.WithName("hubclusterclaim-controller"),
})
if err != nil {
return err
}
clusterClaimCtrlStared = true
return nil
}

func updateHubClusterClaim(ctx context.Context, k8sClient client.Client,
Expand Down
12 changes: 11 additions & 1 deletion agent/pkg/controllers/version_clusterclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/constants"
)

var clusterVersionCtrlStarted = false

type versionClusterClaimController struct {
client client.Client
log logr.Logger
Expand All @@ -46,16 +48,24 @@ func (c *versionClusterClaimController) Reconcile(ctx context.Context, request c
}

func AddVersionClusterClaimController(mgr ctrl.Manager) error {
if clusterVersionCtrlStarted {
return nil
}
clusterClaimPredicate, _ := predicate.LabelSelectorPredicate(metav1.LabelSelector{
MatchLabels: map[string]string{
constants.GlobalHubOwnerLabelKey: constants.GHAgentOwnerLabelValue,
},
})
return ctrl.NewControllerManagedBy(mgr).Named("clusterclaim-controller").
err := ctrl.NewControllerManagedBy(mgr).Named("clusterclaim-controller").
For(&clustersv1alpha1.ClusterClaim{}, builder.WithPredicates(clusterClaimPredicate)).
Watches(&mchv1.MultiClusterHub{}, &handler.EnqueueRequestForObject{}).
Complete(&versionClusterClaimController{
client: mgr.GetClient(),
log: ctrl.Log.WithName("clusterclaim-controller"),
})
if err != nil {
return err
}
clusterVersionCtrlStarted = true
return nil
}
Loading

0 comments on commit ebcf16e

Please sign in to comment.