Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add tranposrt config secret for agent and manager #1060

Merged
merged 9 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 34 additions & 51 deletions agent/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
clusterv1 "open-cluster-management.io/api/cluster/v1"
Expand All @@ -34,7 +33,7 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/jobs"
commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/producer"
"github.com/stolostron/multicluster-global-hub/pkg/transport/controller"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)

Expand Down Expand Up @@ -106,11 +105,11 @@ func parseFlags() *config.AgentConfig {
agentConfig := &config.AgentConfig{
ElectionConfig: &commonobjects.LeaderElectionConfig{},
TransportConfig: &transport.TransportConfig{
KafkaConfig: &transport.KafkaConfig{
Topics: &transport.ClusterTopic{},
ProducerConfig: &transport.KafkaProducerConfig{},
ConsumerConfig: &transport.KafkaConsumerConfig{},
},
// IsManager specifies the send/receive topics from specTopic and statusTopic
// For example, SpecTopic sends and statusTopic receives on the manager; the agent is the opposite
IsManager: false,
// EnableDatabaseOffset affects only the manager, deciding if consumption starts from a database-stored offset
EnableDatabaseOffset: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is worth adding comments for those 2 fields.

},
}

Expand All @@ -121,24 +120,6 @@ func parseFlags() *config.AgentConfig {
pflag.CommandLine.AddGoFlagSet(defaultFlags)

pflag.StringVar(&agentConfig.LeafHubName, "leaf-hub-name", "", "The name of the leaf hub.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server", "",
"The bootstrap server for kafka.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "",
"The path of CA certificate for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "",
"The path of client certificate for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "",
"The path of client key for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id", "",
"Producer Id for the kafka, default is the leaf hub name.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic, "kafka-producer-topic",
"event", "Topic for the kafka producer.")
pflag.IntVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB,
"kafka-message-size-limit", 940, "The limit for kafka message size in KB.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-consumer-topic",
"spec", "Topic for the kafka consumer.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID, "kafka-consumer-id",
"multicluster-global-hub-agent", "ID for the kafka consumer.")
pflag.StringVar(&agentConfig.PodNameSpace, "pod-namespace", constants.GHAgentNamespace,
"The agent running namespace, also used as leader election namespace")
pflag.StringVar(&agentConfig.TransportConfig.TransportType, "transport-type", "kafka",
Expand All @@ -147,9 +128,6 @@ func parseFlags() *config.AgentConfig {
"The goroutine number to propagate the bundles on managed cluster.")
pflag.BoolVar(&agentConfig.SpecEnforceHohRbac, "enforce-hoh-rbac", false,
"enable hoh RBAC or not, default false")
pflag.StringVar(&agentConfig.TransportConfig.MessageCompressionType,
"transport-message-compression-type", "gzip",
"The message compression type for transport layer, 'gzip' or 'no-op'.")
pflag.IntVar(&agentConfig.StatusDeltaCountSwitchFactor,
"status-delta-count-switch-factor", 100,
"default with 100.")
Expand Down Expand Up @@ -180,19 +158,11 @@ func completeConfig(agentConfig *config.AgentConfig) error {
if agentConfig.LeafHubName == "" {
return fmt.Errorf("flag managed-hub-name can't be empty")
}
if agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID == "" {
agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID = agentConfig.LeafHubName
}
agentConfig.TransportConfig.ConsumerGroupId = agentConfig.LeafHubName
if agentConfig.SpecWorkPoolSize < 1 ||
agentConfig.SpecWorkPoolSize > 100 {
return fmt.Errorf("flag consumer-worker-pool-size should be in the scope [1, 100]")
}

if agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB > producer.MaxMessageKBLimit {
return fmt.Errorf("flag kafka-message-size-limit %d must not exceed %d",
agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB, producer.MaxMessageKBLimit)
}
agentConfig.TransportConfig.KafkaConfig.EnableTLS = true
if agentConfig.MetricsAddress == "" {
agentConfig.MetricsAddress = fmt.Sprintf("%s:%d", metricsHost, metricsPort)
}
Expand Down Expand Up @@ -236,25 +206,38 @@ func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) (
if err != nil {
return nil, fmt.Errorf("failed to create a new manager: %w", err)
}
kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to create kubeclient: %w", err)
}
// Need this controller to update the value of clusterclaim hub.open-cluster-management.io
// we use the value to decide whether install the ACM or not
if err := controllers.AddHubClusterClaimController(mgr); err != nil {
return nil, fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err)
}

if err := controllers.AddCRDController(mgr, restConfig, agentConfig); err != nil {
return nil, fmt.Errorf("failed to add crd controller: %w", err)
err = controller.NewTransportCtrl(
agentConfig.PodNameSpace,
constants.GHTransportConfigSecret,
transportCallback(mgr, agentConfig),
agentConfig.TransportConfig,
).SetupWithManager(mgr)
if err != nil {
return nil, err
}
setupLog.Info("add the transport controller to agent")
return mgr, nil
}

if err := controllers.AddCertController(mgr, kubeClient); err != nil {
return nil, fmt.Errorf("failed to add crd controller: %w", err)
}
// if the transport consumer and producer is ready then the func will be invoked by the transport controller
func transportCallback(mgr ctrl.Manager, agentConfig *config.AgentConfig,
) controller.TransportCallback {
return func(producer transport.Producer, consumer transport.Consumer) error {
// Need this controller to update the value of clusterclaim hub.open-cluster-management.io
// we use the value to decide whether install the ACM or not
if err := controllers.AddHubClusterClaimController(mgr); err != nil {
return fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err)
}

return mgr, nil
if err := controllers.AddCRDController(mgr, mgr.GetConfig(), agentConfig, producer, consumer); err != nil {
return fmt.Errorf("failed to add crd controller: %w", err)
}

setupLog.Info("add the agent controllers to manager")
return nil
}
}

func initCache(config *rest.Config, cacheOpts cache.Options) (cache.Cache, error) {
Expand Down
13 changes: 12 additions & 1 deletion agent/pkg/config/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

var leaseCtrlStarted bool

const (
leaseUpdateJitterFactor = 0.25
defaultLeaseDurationSeconds = 60
Expand All @@ -36,6 +38,10 @@ type leaseUpdater struct {

// AddHoHLeaseUpdater creates a new LeaseUpdater instance aand add it to given manager
func AddHoHLeaseUpdater(mgr ctrl.Manager, addonNamespace, addonName string) error {
if leaseCtrlStarted {
return nil
}

var config *rest.Config
if isAgentTesting, ok := os.LookupEnv("AGENT_TESTING"); ok && isAgentTesting == "true" {
config = mgr.GetConfig()
Expand All @@ -59,7 +65,7 @@ func AddHoHLeaseUpdater(mgr ctrl.Manager, addonNamespace, addonName string) erro
return err
}

return mgr.Add(&leaseUpdater{
err = mgr.Add(&leaseUpdater{
log: ctrl.Log.WithName("multicluster-global-hub-lease-updater"),
client: c,
leaseName: addonName,
Expand All @@ -72,6 +78,11 @@ func AddHoHLeaseUpdater(mgr ctrl.Manager, addonNamespace, addonName string) erro
"name=multicluster-global-hub-agent"),
},
})
if err != nil {
return err
}
leaseCtrlStarted = true
return nil
}

// Start starts a goroutine to update lease to implement controller-runtime Runnable interface
Expand Down
63 changes: 0 additions & 63 deletions agent/pkg/controllers/cert_controller.go

This file was deleted.

26 changes: 21 additions & 5 deletions agent/pkg/controllers/crd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,31 @@ import (
"github.com/stolostron/multicluster-global-hub/agent/pkg/config"
specController "github.com/stolostron/multicluster-global-hub/agent/pkg/spec/controller"
statusController "github.com/stolostron/multicluster-global-hub/agent/pkg/status/controller"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
)

var crdCtrlStarted = false

type crdController struct {
mgr ctrl.Manager
log logr.Logger
restConfig *rest.Config
agentConfig *config.AgentConfig
producer transport.Producer
consumer transport.Consumer
}

func (c *crdController) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
reqLogger := c.log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.V(2).Info("crd controller", "NamespacedName:", request.NamespacedName)

// add spec controllers
if err := specController.AddToManager(c.mgr, c.agentConfig); err != nil {
if err := specController.AddToManager(c.mgr, c.consumer, c.agentConfig); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to add spec syncer: %w", err)
}
reqLogger.V(2).Info("add spec controllers to manager")

if err := statusController.AddControllers(ctx, c.mgr, c.agentConfig); err != nil {
if err := statusController.AddControllers(ctx, c.mgr, c.producer, c.agentConfig); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to add status syncer: %w", err)
}

Expand All @@ -56,8 +61,13 @@ func (c *crdController) Reconcile(ctx context.Context, request ctrl.Request) (ct

// this controller is used to watch the multiclusterhub crd or clustermanager crd
// if the crd exists, then add controllers to the manager dynamically
func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *config.AgentConfig) error {
return ctrl.NewControllerManagedBy(mgr).
func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *config.AgentConfig,
producer transport.Producer, consumer transport.Consumer,
) error {
if crdCtrlStarted {
return nil
}
if err := ctrl.NewControllerManagedBy(mgr).
For(&apiextensionsv1.CustomResourceDefinition{}, builder.WithPredicates(predicate.Funcs{
// trigger the reconciler only if the crd is created
CreateFunc: func(e event.CreateEvent) bool {
Expand All @@ -77,6 +87,12 @@ func AddCRDController(mgr ctrl.Manager, restConfig *rest.Config, agentConfig *co
mgr: mgr,
restConfig: restConfig,
agentConfig: agentConfig,
producer: producer,
consumer: consumer,
log: ctrl.Log.WithName("crd-controller"),
})
}); err != nil {
return err
}
crdCtrlStarted = true
return nil
}
12 changes: 11 additions & 1 deletion agent/pkg/controllers/hub_clusterclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)

var clusterClaimCtrlStared = false

type hubClusterClaimController struct {
client client.Client
log logr.Logger
Expand All @@ -40,19 +42,27 @@ func (c *hubClusterClaimController) Reconcile(ctx context.Context, request ctrl.
func AddHubClusterClaimController(mgr ctrl.Manager) error {
// the controller is only to trigger create hub clusterClaim at the beginning
// do nothing if the hub clusterClaim existed
if clusterClaimCtrlStared {
return nil
}
clusterClaimPredicate := predicate.NewPredicateFuncs(func(object client.Object) bool {
if object.GetName() == constants.HubClusterClaimName {
return false
}
clusterClaim, _ := getClusterClaim(context.Background(), mgr.GetClient(), constants.HubClusterClaimName)
return clusterClaim == nil
})
return ctrl.NewControllerManagedBy(mgr).Named("hubclusterclaim-controller").
err := ctrl.NewControllerManagedBy(mgr).Named("hubclusterclaim-controller").
For(&clustersv1alpha1.ClusterClaim{}, builder.WithPredicates(clusterClaimPredicate)).
Complete(&hubClusterClaimController{
client: mgr.GetClient(),
log: ctrl.Log.WithName("hubclusterclaim-controller"),
})
if err != nil {
return err
}
clusterClaimCtrlStared = true
return nil
}

func updateHubClusterClaim(ctx context.Context, k8sClient client.Client,
Expand Down
12 changes: 11 additions & 1 deletion agent/pkg/controllers/version_clusterclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/constants"
)

var clusterVersionCtrlStarted = false

type versionClusterClaimController struct {
client client.Client
log logr.Logger
Expand All @@ -46,16 +48,24 @@ func (c *versionClusterClaimController) Reconcile(ctx context.Context, request c
}

func AddVersionClusterClaimController(mgr ctrl.Manager) error {
if clusterVersionCtrlStarted {
return nil
}
clusterClaimPredicate, _ := predicate.LabelSelectorPredicate(metav1.LabelSelector{
MatchLabels: map[string]string{
constants.GlobalHubOwnerLabelKey: constants.GHAgentOwnerLabelValue,
},
})
return ctrl.NewControllerManagedBy(mgr).Named("clusterclaim-controller").
err := ctrl.NewControllerManagedBy(mgr).Named("clusterclaim-controller").
For(&clustersv1alpha1.ClusterClaim{}, builder.WithPredicates(clusterClaimPredicate)).
Watches(&mchv1.MultiClusterHub{}, &handler.EnqueueRequestForObject{}).
Complete(&versionClusterClaimController{
client: mgr.GetClient(),
log: ctrl.Log.WithName("clusterclaim-controller"),
})
if err != nil {
return err
}
clusterVersionCtrlStarted = true
return nil
}
Loading
Loading