Skip to content

Commit

Permalink
replace transport connection from flag to secret
Browse files Browse the repository at this point in the history
Signed-off-by: myan <[email protected]>

update

Signed-off-by: myan <[email protected]>

add agent

Signed-off-by: myan <[email protected]>

int

Signed-off-by: myan <[email protected]>

fix the it

Signed-off-by: myan <[email protected]>

d

Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa committed Aug 13, 2024
1 parent 85c88cb commit fc71701
Show file tree
Hide file tree
Showing 38 changed files with 754 additions and 525 deletions.
79 changes: 43 additions & 36 deletions agent/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/jobs"
commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/controller"
"github.com/stolostron/multicluster-global-hub/pkg/transport/producer"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
commonutils "github.com/stolostron/multicluster-global-hub/pkg/utils"
)

const (
Expand Down Expand Up @@ -88,7 +90,7 @@ func doMain(ctx context.Context, restConfig *rest.Config, agentConfig *config.Ag
go utils.StartDefaultPprofServer()
}

mgr, err := createManager(restConfig, agentConfig)
mgr, err := createManager(ctx, restConfig, agentConfig)
if err != nil {
setupLog.Error(err, "failed to create manager")
return 1
Expand Down Expand Up @@ -121,35 +123,16 @@ func parseFlags() *config.AgentConfig {
pflag.CommandLine.AddGoFlagSet(defaultFlags)

pflag.StringVar(&agentConfig.LeafHubName, "leaf-hub-name", "", "The name of the leaf hub.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server", "",
"The bootstrap server for kafka.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "",
"The path of CA certificate for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "",
"The path of client certificate for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "",
"The path of client key for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id", "",
"Producer Id for the kafka, default is the leaf hub name.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic, "kafka-producer-topic",
"event", "Topic for the kafka producer.")
pflag.IntVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB,
"kafka-message-size-limit", 940, "The limit for kafka message size in KB.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-consumer-topic",
"spec", "Topic for the kafka consumer.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID, "kafka-consumer-id",
"multicluster-global-hub-agent", "ID for the kafka consumer.")
pflag.StringVar(&agentConfig.PodNameSpace, "pod-namespace", constants.GHAgentNamespace,
"The agent running namespace, also used as leader election namespace")
pflag.StringVar(&agentConfig.TransportConfig.TransportType, "transport-type", "kafka",
"The transport type, 'kafka'")
pflag.IntVar(&agentConfig.SpecWorkPoolSize, "consumer-worker-pool-size", 10,
"The goroutine number to propagate the bundles on managed cluster.")
pflag.BoolVar(&agentConfig.SpecEnforceHohRbac, "enforce-hoh-rbac", false,
"enable hoh RBAC or not, default false")
pflag.StringVar(&agentConfig.TransportConfig.MessageCompressionType,
"transport-message-compression-type", "gzip",
"The message compression type for transport layer, 'gzip' or 'no-op'.")
pflag.IntVar(&agentConfig.StatusDeltaCountSwitchFactor,
"status-delta-count-switch-factor", 100,
"default with 100.")
Expand Down Expand Up @@ -199,7 +182,7 @@ func completeConfig(agentConfig *config.AgentConfig) error {
return nil
}

func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) (
func createManager(ctx context.Context, restConfig *rest.Config, agentConfig *config.AgentConfig) (
ctrl.Manager, error,
) {
leaseDuration := time.Duration(agentConfig.ElectionConfig.LeaseDuration) * time.Second
Expand Down Expand Up @@ -236,27 +219,51 @@ func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) (
if err != nil {
return nil, fmt.Errorf("failed to create a new manager: %w", err)
}
kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to create kubeclient: %w", err)
}
// Need this controller to update the value of clusterclaim hub.open-cluster-management.io
// we use the value to decide whether install the ACM or not
if err := controllers.AddHubClusterClaimController(mgr); err != nil {
return nil, fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err)
}

if err := controllers.AddCRDController(mgr, restConfig, agentConfig); err != nil {
return nil, fmt.Errorf("failed to add crd controller: %w", err)
}

if err := controllers.AddCertController(mgr, kubeClient); err != nil {
return nil, fmt.Errorf("failed to add crd controller: %w", err)
transportCtrl := controller.NewTransportCtrl(
agentConfig.PodNameSpace,
constants.GHAgentTransportSecret,
agentConfig.TransportConfig.KafkaConfig,
transportCallback(mgr, agentConfig))
if err = transportCtrl.SetupWithManager(ctx, mgr, []string{
constants.GHAgentTransportSecret,
commonutils.AgentCertificateSecretName(),
}); err != nil {
return nil, err
}
setupLog.Info("add the transport controller to agent")

return mgr, nil
}

// if the transport consumer and producer is ready then the func will be invoked by the transport controller
func transportCallback(mgr ctrl.Manager, agentConfig *config.AgentConfig,
) controller.TransportCallback {
return func(producer transport.Producer, consumer transport.Consumer) error {
kubeClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("failed to create kubeclient: %w", err)
}

// Need this controller to update the value of clusterclaim hub.open-cluster-management.io
// we use the value to decide whether install the ACM or not
if err := controllers.AddHubClusterClaimController(mgr); err != nil {
return fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err)
}

if err := controllers.AddCRDController(mgr, mgr.GetConfig(), agentConfig); err != nil {
return fmt.Errorf("failed to add crd controller: %w", err)
}

if err := controllers.AddCertController(mgr, kubeClient); err != nil {
return fmt.Errorf("failed to add crd controller: %w", err)
}

setupLog.Info("add the agent controllers to manager")
return nil
}
}

func initCache(config *rest.Config, cacheOpts cache.Options) (cache.Cache, error) {
cacheOpts.ByObject = map[client.Object]cache.ByObject{
&apiextensionsv1.CustomResourceDefinition{}: {
Expand Down
97 changes: 46 additions & 51 deletions manager/cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/stolostron/multicluster-global-hub/manager/pkg/backup"
"github.com/stolostron/multicluster-global-hub/manager/pkg/config"
managerconfig "github.com/stolostron/multicluster-global-hub/manager/pkg/config"
"github.com/stolostron/multicluster-global-hub/manager/pkg/cronjob"
"github.com/stolostron/multicluster-global-hub/manager/pkg/hubmanagement"
Expand All @@ -39,6 +40,7 @@ import (
commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects"
"github.com/stolostron/multicluster-global-hub/pkg/statistics"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/controller"
"github.com/stolostron/multicluster-global-hub/pkg/transport/producer"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)
Expand Down Expand Up @@ -72,6 +74,7 @@ func parseFlags() *managerconfig.ManagerConfig {
DatabaseConfig: &managerconfig.DatabaseConfig{},
TransportConfig: &transport.TransportConfig{
KafkaConfig: &transport.KafkaConfig{
ConnCredential: &transport.ConnCredential{},
EnableTLS: true,
Topics: &transport.ClusterTopic{},
ProducerConfig: &transport.KafkaProducerConfig{},
Expand Down Expand Up @@ -111,32 +114,14 @@ func parseFlags() *managerconfig.ManagerConfig {
"transport-bridge-database-url", "", "The URL of database server for the transport-bridge user.")
pflag.StringVar(&managerConfig.TransportConfig.TransportType, "transport-type", "kafka",
"The transport type, 'kafka'.")
pflag.StringVar(&managerConfig.TransportConfig.MessageCompressionType, "transport-message-compression-type",
"gzip", "The message compression type for transport layer, 'gzip' or 'no-op'.")
pflag.DurationVar(&managerConfig.TransportConfig.CommitterInterval, "transport-committer-interval",
40*time.Second, "The committer interval for transport layer.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server",
"kafka-kafka-bootstrap.kafka.svc:9092", "The bootstrap server for kafka.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClusterIdentity, "kafka-cluster-identity",
"", "The identity for kafka cluster.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "",
"The path of CA certificate for kafka bootstrap server.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "",
"The path of client certificate for kafka bootstrap server.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "",
"The path of client key for kafka bootstrap server.")
pflag.StringVar(&managerConfig.DatabaseConfig.CACertPath, "postgres-ca-path", "/postgres-ca/ca.crt",
"The path of CA certificate for kafka bootstrap server.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id",
"multicluster-global-hub-manager", "ID for the kafka producer.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-producer-topic",
"spec", "Topic for the kafka producer.")
pflag.IntVar(&managerConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB,
"kafka-message-size-limit", 940, "The limit for kafka message size in KB.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID,
"kafka-consumer-id", "multicluster-global-hub-manager", "ID for the kafka consumer.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.Topics.StatusTopic,
"kafka-consumer-topic", "event", "Topic for the kafka consumer.")
pflag.StringVar(&managerConfig.StatisticsConfig.LogInterval, "statistics-log-interval", "1m",
"The log interval for statistics.")
pflag.StringVar(&managerConfig.NonK8sAPIServerConfig.ClusterAPIURL, "cluster-api-url",
Expand Down Expand Up @@ -242,51 +227,61 @@ func createManager(ctx context.Context,
return nil, fmt.Errorf("failed to create a new manager: %w", err)
}

producer, err := producer.NewGenericProducer(managerConfig.TransportConfig,
managerConfig.TransportConfig.KafkaConfig.Topics.SpecTopic)
if err != nil {
return nil, fmt.Errorf("failed to init spec transport bridge: %w", err)
}

// TODO: refactor the manager to start the conflation manager so that it can handle the events from restful API

if !managerConfig.WithACM {
return mgr, nil
transportCtrl := controller.NewTransportCtrl(
managerConfig.ManagerNamespace,
constants.GHManagerTransportSecret,
managerConfig.TransportConfig.KafkaConfig,
transportCallback(ctx, mgr, managerConfig, sqlConn))
if err = transportCtrl.SetupWithManager(ctx, mgr, []string{constants.GHManagerTransportSecret}); err != nil {
return nil, err
}
setupLog.Info("add the transport controller to manager")
return mgr, nil
}

if managerConfig.EnableGlobalResource {
if err := nonk8sapi.AddNonK8sApiServer(mgr, managerConfig.NonK8sAPIServerConfig); err != nil {
return nil, fmt.Errorf("failed to add non-k8s-api-server: %w", err)
// if the transport consumer and producer is ready then the func will be invoked by the transport controller
func transportCallback(ctx context.Context, mgr ctrl.Manager, managerConfig *config.ManagerConfig,
sqlConn *sql.Conn,
) controller.TransportCallback {
return func(producer transport.Producer, consumer transport.Consumer) error {
if !managerConfig.WithACM {
return nil
}
}

if managerConfig.EnableGlobalResource {
if err := specsyncer.AddGlobalResourceSpecSyncers(mgr, managerConfig, producer); err != nil {
return nil, fmt.Errorf("failed to add global resource spec syncers: %w", err)
if managerConfig.EnableGlobalResource {
if err := nonk8sapi.AddNonK8sApiServer(mgr, managerConfig.NonK8sAPIServerConfig); err != nil {
return fmt.Errorf("failed to add non-k8s-api-server: %w", err)
}
if err := specsyncer.AddGlobalResourceSpecSyncers(mgr, managerConfig, producer); err != nil {
return fmt.Errorf("failed to add global resource spec syncers: %w", err)
}
}
}

if err := statussyncer.AddStatusSyncers(mgr, managerConfig); err != nil {
return nil, fmt.Errorf("failed to add transport-to-db syncers: %w", err)
}
if err := statussyncer.AddStatusSyncers(mgr, consumer, managerConfig); err != nil {
return fmt.Errorf("failed to add transport-to-db syncers: %w", err)
}

// add hub management
if err := hubmanagement.AddHubManagement(mgr, producer); err != nil {
return nil, fmt.Errorf("failed to add hubmanagement to manager - %w", err)
}
// add hub management
if err := hubmanagement.AddHubManagement(mgr, producer); err != nil {
return fmt.Errorf("failed to add hubmanagement to manager - %w", err)
}

// need lock DB for backup
backupPVC := backup.NewBackupPVCReconciler(mgr, sqlConn)
err = backupPVC.SetupWithManager(mgr)
if err != nil {
return nil, err
}
// need lock DB for backup
backupPVC := backup.NewBackupPVCReconciler(mgr, sqlConn)
err := backupPVC.SetupWithManager(mgr)
if err != nil {
return err
}

if err := cronjob.AddSchedulerToManager(ctx, mgr, managerConfig, enableSimulation); err != nil {
return nil, fmt.Errorf("failed to add scheduler to manager: %w", err)
}
if err := cronjob.AddSchedulerToManager(ctx, mgr, managerConfig, enableSimulation); err != nil {
return fmt.Errorf("failed to add scheduler to manager: %w", err)
}

return mgr, nil
setupLog.Info("add the manager controllers to ctrl.Manager")
return nil
}
}

// function to handle defers with exit, see https://stackoverflow.com/a/27629493/553720.
Expand Down
15 changes: 1 addition & 14 deletions manager/pkg/statussyncer/dispatcher/transport_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/conflator"
"github.com/stolostron/multicluster-global-hub/pkg/statistics"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
genericconsumer "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer"
)

// Get message from transport, convert it to bundle and forward it to conflation manager.
Expand All @@ -22,21 +21,9 @@ type TransportDispatcher struct {
statistic *statistics.Statistics
}

func AddTransportDispatcher(mgr ctrl.Manager, managerConfig *config.ManagerConfig,
func AddTransportDispatcher(mgr ctrl.Manager, consumer transport.Consumer, managerConfig *config.ManagerConfig,
conflationManager *conflator.ConflationManager, stats *statistics.Statistics,
) error {
// start a consumer
topics := managerConfig.TransportConfig.KafkaConfig.Topics
consumer, err := genericconsumer.NewGenericConsumer(managerConfig.TransportConfig,
[]string{topics.StatusTopic},
genericconsumer.EnableDatabaseOffset(true))
if err != nil {
return fmt.Errorf("failed to initialize transport consumer: %w", err)
}
if err := mgr.Add(consumer); err != nil {
return fmt.Errorf("failed to add transport consumer to manager: %w", err)
}

transportDispatcher := &TransportDispatcher{
log: ctrl.Log.WithName("conflation-dispatcher"),
consumer: consumer,
Expand Down
5 changes: 3 additions & 2 deletions manager/pkg/statussyncer/syncers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/dispatcher"
dbsyncer "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/syncers"
"github.com/stolostron/multicluster-global-hub/pkg/statistics"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
)

// AddStatusSyncers performs the initial setup required before starting the runtime manager.
// adds controllers and/or runnables to the manager, registers handler to conflation manager
func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) error {
func AddStatusSyncers(mgr ctrl.Manager, consumer transport.Consumer, managerConfig *config.ManagerConfig) error {
// create statistics
stats := statistics.NewStatistics(managerConfig.StatisticsConfig)
if err := mgr.Add(stats); err != nil {
Expand All @@ -26,7 +27,7 @@ func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) err
registerHandler(conflationManager, managerConfig.EnableGlobalResource)

// start consume message from transport to conflation manager
if err := dispatcher.AddTransportDispatcher(mgr, managerConfig, conflationManager, stats); err != nil {
if err := dispatcher.AddTransportDispatcher(mgr, consumer, managerConfig, conflationManager, stats); err != nil {
return err
}

Expand Down
1 change: 0 additions & 1 deletion operator/pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ const (
// global hub agent constants
const (
GHClusterManagementAddonName = "multicluster-global-hub-controller"
GHManagedClusterAddonName = "multicluster-global-hub-controller"
)

// global hub names
Expand Down
6 changes: 3 additions & 3 deletions operator/pkg/controllers/addon/addon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4"
"github.com/stolostron/multicluster-global-hub/operator/pkg/config"
operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants"
"github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates"
"github.com/stolostron/multicluster-global-hub/operator/pkg/utils"
"github.com/stolostron/multicluster-global-hub/pkg/constants"
)

// +kubebuilder:rbac:groups=cluster.open-cluster-management.io,resources=managedclusters,verbs=get;list;watch
Expand Down Expand Up @@ -112,7 +112,7 @@ func (a *AddonController) Start(ctx context.Context) error {
}

agentAddon, err := addonfactory.NewAgentAddonFactory(
operatorconstants.GHManagedClusterAddonName, FS, "manifests").
constants.GHManagedClusterAddonName, FS, "manifests").
WithAgentHostedModeEnabledOption().
WithGetValuesFuncs(hohAgentAddon.GetValues,
addonfactory.GetValuesFromAddonAnnotation,
Expand All @@ -121,7 +121,7 @@ func (a *AddonController) Start(ctx context.Context) error {
addonfactory.ToAddOnDeloymentConfigValues,
addonfactory.ToAddOnCustomizedVariableValues,
)).
WithAgentRegistrationOption(newRegistrationOption(operatorconstants.GHManagedClusterAddonName)).
WithAgentRegistrationOption(newRegistrationOption(constants.GHManagedClusterAddonName)).
WithScheme(addonScheme).
BuildTemplateAgentAddon()
if err != nil {
Expand Down
Loading

0 comments on commit fc71701

Please sign in to comment.