Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport connection secret #1045

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 43 additions & 36 deletions agent/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/jobs"
commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/controller"
"github.com/stolostron/multicluster-global-hub/pkg/transport/producer"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
commonutils "github.com/stolostron/multicluster-global-hub/pkg/utils"
)

const (
Expand Down Expand Up @@ -88,7 +90,7 @@ func doMain(ctx context.Context, restConfig *rest.Config, agentConfig *config.Ag
go utils.StartDefaultPprofServer()
}

mgr, err := createManager(restConfig, agentConfig)
mgr, err := createManager(ctx, restConfig, agentConfig)
if err != nil {
setupLog.Error(err, "failed to create manager")
return 1
Expand Down Expand Up @@ -121,35 +123,16 @@ func parseFlags() *config.AgentConfig {
pflag.CommandLine.AddGoFlagSet(defaultFlags)

pflag.StringVar(&agentConfig.LeafHubName, "leaf-hub-name", "", "The name of the leaf hub.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server", "",
"The bootstrap server for kafka.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "",
"The path of CA certificate for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "",
"The path of client certificate for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "",
"The path of client key for kafka bootstrap server.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id", "",
"Producer Id for the kafka, default is the leaf hub name.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.StatusTopic, "kafka-producer-topic",
"event", "Topic for the kafka producer.")
pflag.IntVar(&agentConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB,
"kafka-message-size-limit", 940, "The limit for kafka message size in KB.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-consumer-topic",
"spec", "Topic for the kafka consumer.")
pflag.StringVar(&agentConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID, "kafka-consumer-id",
"multicluster-global-hub-agent", "ID for the kafka consumer.")
pflag.StringVar(&agentConfig.PodNameSpace, "pod-namespace", constants.GHAgentNamespace,
"The agent running namespace, also used as leader election namespace")
pflag.StringVar(&agentConfig.TransportConfig.TransportType, "transport-type", "kafka",
"The transport type, 'kafka'")
pflag.IntVar(&agentConfig.SpecWorkPoolSize, "consumer-worker-pool-size", 10,
"The goroutine number to propagate the bundles on managed cluster.")
pflag.BoolVar(&agentConfig.SpecEnforceHohRbac, "enforce-hoh-rbac", false,
"enable hoh RBAC or not, default false")
pflag.StringVar(&agentConfig.TransportConfig.MessageCompressionType,
"transport-message-compression-type", "gzip",
"The message compression type for transport layer, 'gzip' or 'no-op'.")
pflag.IntVar(&agentConfig.StatusDeltaCountSwitchFactor,
"status-delta-count-switch-factor", 100,
"default with 100.")
Expand Down Expand Up @@ -199,7 +182,7 @@ func completeConfig(agentConfig *config.AgentConfig) error {
return nil
}

func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) (
func createManager(ctx context.Context, restConfig *rest.Config, agentConfig *config.AgentConfig) (
ctrl.Manager, error,
) {
leaseDuration := time.Duration(agentConfig.ElectionConfig.LeaseDuration) * time.Second
Expand Down Expand Up @@ -236,27 +219,51 @@ func createManager(restConfig *rest.Config, agentConfig *config.AgentConfig) (
if err != nil {
return nil, fmt.Errorf("failed to create a new manager: %w", err)
}
kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to create kubeclient: %w", err)
}
// Need this controller to update the value of clusterclaim hub.open-cluster-management.io
// we use the value to decide whether install the ACM or not
if err := controllers.AddHubClusterClaimController(mgr); err != nil {
return nil, fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err)
}

if err := controllers.AddCRDController(mgr, restConfig, agentConfig); err != nil {
return nil, fmt.Errorf("failed to add crd controller: %w", err)
}

if err := controllers.AddCertController(mgr, kubeClient); err != nil {
return nil, fmt.Errorf("failed to add crd controller: %w", err)
transportCtrl := controller.NewTransportCtrl(
agentConfig.PodNameSpace,
constants.GHAgentTransportSecret,
agentConfig.TransportConfig.KafkaConfig,
transportCallback(mgr, agentConfig))
if err = transportCtrl.SetupWithManager(ctx, mgr, []string{
constants.GHAgentTransportSecret,
commonutils.AgentCertificateSecretName(),
}); err != nil {
return nil, err
}
setupLog.Info("add the transport controller to agent")

return mgr, nil
}

// if the transport consumer and producer is ready then the func will be invoked by the transport controller
func transportCallback(mgr ctrl.Manager, agentConfig *config.AgentConfig,
) controller.TransportCallback {
return func(producer transport.Producer, consumer transport.Consumer) error {
kubeClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("failed to create kubeclient: %w", err)
}

// Need this controller to update the value of clusterclaim hub.open-cluster-management.io
// we use the value to decide whether install the ACM or not
if err := controllers.AddHubClusterClaimController(mgr); err != nil {
return fmt.Errorf("failed to add hub.open-cluster-management.io clusterclaim controller: %w", err)
}

if err := controllers.AddCRDController(mgr, mgr.GetConfig(), agentConfig); err != nil {
return fmt.Errorf("failed to add crd controller: %w", err)
}

if err := controllers.AddCertController(mgr, kubeClient); err != nil {
return fmt.Errorf("failed to add crd controller: %w", err)
}

setupLog.Info("add the agent controllers to manager")
return nil
}
}

func initCache(config *rest.Config, cacheOpts cache.Options) (cache.Cache, error) {
cacheOpts.ByObject = map[client.Object]cache.ByObject{
&apiextensionsv1.CustomResourceDefinition{}: {
Expand Down
97 changes: 46 additions & 51 deletions manager/cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/stolostron/multicluster-global-hub/manager/pkg/backup"
"github.com/stolostron/multicluster-global-hub/manager/pkg/config"
managerconfig "github.com/stolostron/multicluster-global-hub/manager/pkg/config"
"github.com/stolostron/multicluster-global-hub/manager/pkg/cronjob"
"github.com/stolostron/multicluster-global-hub/manager/pkg/hubmanagement"
Expand All @@ -39,6 +40,7 @@ import (
commonobjects "github.com/stolostron/multicluster-global-hub/pkg/objects"
"github.com/stolostron/multicluster-global-hub/pkg/statistics"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/controller"
"github.com/stolostron/multicluster-global-hub/pkg/transport/producer"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)
Expand Down Expand Up @@ -72,6 +74,7 @@ func parseFlags() *managerconfig.ManagerConfig {
DatabaseConfig: &managerconfig.DatabaseConfig{},
TransportConfig: &transport.TransportConfig{
KafkaConfig: &transport.KafkaConfig{
ConnCredential: &transport.ConnCredential{},
EnableTLS: true,
Topics: &transport.ClusterTopic{},
ProducerConfig: &transport.KafkaProducerConfig{},
Expand Down Expand Up @@ -111,32 +114,14 @@ func parseFlags() *managerconfig.ManagerConfig {
"transport-bridge-database-url", "", "The URL of database server for the transport-bridge user.")
pflag.StringVar(&managerConfig.TransportConfig.TransportType, "transport-type", "kafka",
"The transport type, 'kafka'.")
pflag.StringVar(&managerConfig.TransportConfig.MessageCompressionType, "transport-message-compression-type",
"gzip", "The message compression type for transport layer, 'gzip' or 'no-op'.")
pflag.DurationVar(&managerConfig.TransportConfig.CommitterInterval, "transport-committer-interval",
40*time.Second, "The committer interval for transport layer.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.BootstrapServer, "kafka-bootstrap-server",
"kafka-kafka-bootstrap.kafka.svc:9092", "The bootstrap server for kafka.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClusterIdentity, "kafka-cluster-identity",
"", "The identity for kafka cluster.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.CaCertPath, "kafka-ca-cert-path", "",
"The path of CA certificate for kafka bootstrap server.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClientCertPath, "kafka-client-cert-path", "",
"The path of client certificate for kafka bootstrap server.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ClientKeyPath, "kafka-client-key-path", "",
"The path of client key for kafka bootstrap server.")
pflag.StringVar(&managerConfig.DatabaseConfig.CACertPath, "postgres-ca-path", "/postgres-ca/ca.crt",
"The path of CA certificate for kafka bootstrap server.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ProducerConfig.ProducerID, "kafka-producer-id",
"multicluster-global-hub-manager", "ID for the kafka producer.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.Topics.SpecTopic, "kafka-producer-topic",
"spec", "Topic for the kafka producer.")
pflag.IntVar(&managerConfig.TransportConfig.KafkaConfig.ProducerConfig.MessageSizeLimitKB,
"kafka-message-size-limit", 940, "The limit for kafka message size in KB.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.ConsumerConfig.ConsumerID,
"kafka-consumer-id", "multicluster-global-hub-manager", "ID for the kafka consumer.")
pflag.StringVar(&managerConfig.TransportConfig.KafkaConfig.Topics.StatusTopic,
"kafka-consumer-topic", "event", "Topic for the kafka consumer.")
pflag.StringVar(&managerConfig.StatisticsConfig.LogInterval, "statistics-log-interval", "1m",
"The log interval for statistics.")
pflag.StringVar(&managerConfig.NonK8sAPIServerConfig.ClusterAPIURL, "cluster-api-url",
Expand Down Expand Up @@ -242,51 +227,61 @@ func createManager(ctx context.Context,
return nil, fmt.Errorf("failed to create a new manager: %w", err)
}

producer, err := producer.NewGenericProducer(managerConfig.TransportConfig,
managerConfig.TransportConfig.KafkaConfig.Topics.SpecTopic)
if err != nil {
return nil, fmt.Errorf("failed to init spec transport bridge: %w", err)
}

// TODO: refactor the manager to start the conflation manager so that it can handle the events from restful API

if !managerConfig.WithACM {
return mgr, nil
transportCtrl := controller.NewTransportCtrl(
managerConfig.ManagerNamespace,
constants.GHManagerTransportSecret,
managerConfig.TransportConfig.KafkaConfig,
transportCallback(ctx, mgr, managerConfig, sqlConn))
if err = transportCtrl.SetupWithManager(ctx, mgr, []string{constants.GHManagerTransportSecret}); err != nil {
return nil, err
}
setupLog.Info("add the transport controller to manager")
return mgr, nil
}

if managerConfig.EnableGlobalResource {
if err := nonk8sapi.AddNonK8sApiServer(mgr, managerConfig.NonK8sAPIServerConfig); err != nil {
return nil, fmt.Errorf("failed to add non-k8s-api-server: %w", err)
// if the transport consumer and producer is ready then the func will be invoked by the transport controller
func transportCallback(ctx context.Context, mgr ctrl.Manager, managerConfig *config.ManagerConfig,
sqlConn *sql.Conn,
) controller.TransportCallback {
return func(producer transport.Producer, consumer transport.Consumer) error {
if !managerConfig.WithACM {
return nil
}
}

if managerConfig.EnableGlobalResource {
if err := specsyncer.AddGlobalResourceSpecSyncers(mgr, managerConfig, producer); err != nil {
return nil, fmt.Errorf("failed to add global resource spec syncers: %w", err)
if managerConfig.EnableGlobalResource {
if err := nonk8sapi.AddNonK8sApiServer(mgr, managerConfig.NonK8sAPIServerConfig); err != nil {
return fmt.Errorf("failed to add non-k8s-api-server: %w", err)
}
if err := specsyncer.AddGlobalResourceSpecSyncers(mgr, managerConfig, producer); err != nil {
return fmt.Errorf("failed to add global resource spec syncers: %w", err)
}
}
}

if err := statussyncer.AddStatusSyncers(mgr, managerConfig); err != nil {
return nil, fmt.Errorf("failed to add transport-to-db syncers: %w", err)
}
if err := statussyncer.AddStatusSyncers(mgr, consumer, managerConfig); err != nil {
return fmt.Errorf("failed to add transport-to-db syncers: %w", err)
}

// add hub management
if err := hubmanagement.AddHubManagement(mgr, producer); err != nil {
return nil, fmt.Errorf("failed to add hubmanagement to manager - %w", err)
}
// add hub management
if err := hubmanagement.AddHubManagement(mgr, producer); err != nil {
return fmt.Errorf("failed to add hubmanagement to manager - %w", err)
}

// need lock DB for backup
backupPVC := backup.NewBackupPVCReconciler(mgr, sqlConn)
err = backupPVC.SetupWithManager(mgr)
if err != nil {
return nil, err
}
// need lock DB for backup
backupPVC := backup.NewBackupPVCReconciler(mgr, sqlConn)
err := backupPVC.SetupWithManager(mgr)
if err != nil {
return err
}

if err := cronjob.AddSchedulerToManager(ctx, mgr, managerConfig, enableSimulation); err != nil {
return nil, fmt.Errorf("failed to add scheduler to manager: %w", err)
}
if err := cronjob.AddSchedulerToManager(ctx, mgr, managerConfig, enableSimulation); err != nil {
return fmt.Errorf("failed to add scheduler to manager: %w", err)
}

return mgr, nil
setupLog.Info("add the manager controllers to ctrl.Manager")
return nil
}
}

// function to handle defers with exit, see https://stackoverflow.com/a/27629493/553720.
Expand Down
15 changes: 1 addition & 14 deletions manager/pkg/statussyncer/dispatcher/transport_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/conflator"
"github.com/stolostron/multicluster-global-hub/pkg/statistics"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
genericconsumer "github.com/stolostron/multicluster-global-hub/pkg/transport/consumer"
)

// Get message from transport, convert it to bundle and forward it to conflation manager.
Expand All @@ -22,21 +21,9 @@ type TransportDispatcher struct {
statistic *statistics.Statistics
}

func AddTransportDispatcher(mgr ctrl.Manager, managerConfig *config.ManagerConfig,
func AddTransportDispatcher(mgr ctrl.Manager, consumer transport.Consumer, managerConfig *config.ManagerConfig,
conflationManager *conflator.ConflationManager, stats *statistics.Statistics,
) error {
// start a consumer
topics := managerConfig.TransportConfig.KafkaConfig.Topics
consumer, err := genericconsumer.NewGenericConsumer(managerConfig.TransportConfig,
[]string{topics.StatusTopic},
genericconsumer.EnableDatabaseOffset(true))
if err != nil {
return fmt.Errorf("failed to initialize transport consumer: %w", err)
}
if err := mgr.Add(consumer); err != nil {
return fmt.Errorf("failed to add transport consumer to manager: %w", err)
}

transportDispatcher := &TransportDispatcher{
log: ctrl.Log.WithName("conflation-dispatcher"),
consumer: consumer,
Expand Down
5 changes: 3 additions & 2 deletions manager/pkg/statussyncer/syncers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/dispatcher"
dbsyncer "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/syncers"
"github.com/stolostron/multicluster-global-hub/pkg/statistics"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
)

// AddStatusSyncers performs the initial setup required before starting the runtime manager.
// adds controllers and/or runnables to the manager, registers handler to conflation manager
func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) error {
func AddStatusSyncers(mgr ctrl.Manager, consumer transport.Consumer, managerConfig *config.ManagerConfig) error {
// create statistics
stats := statistics.NewStatistics(managerConfig.StatisticsConfig)
if err := mgr.Add(stats); err != nil {
Expand All @@ -26,7 +27,7 @@ func AddStatusSyncers(mgr ctrl.Manager, managerConfig *config.ManagerConfig) err
registerHandler(conflationManager, managerConfig.EnableGlobalResource)

// start consume message from transport to conflation manager
if err := dispatcher.AddTransportDispatcher(mgr, managerConfig, conflationManager, stats); err != nil {
if err := dispatcher.AddTransportDispatcher(mgr, consumer, managerConfig, conflationManager, stats); err != nil {
return err
}

Expand Down
1 change: 0 additions & 1 deletion operator/pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ const (
// global hub agent constants
const (
GHClusterManagementAddonName = "multicluster-global-hub-controller"
GHManagedClusterAddonName = "multicluster-global-hub-controller"
)

// global hub names
Expand Down
2 changes: 1 addition & 1 deletion operator/pkg/controllers/addon/addon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-logr/logr"
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants"
mchv1 "github.com/stolostron/multiclusterhub-operator/api/v1"
certificatesv1 "k8s.io/api/certificates/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -25,7 +26,6 @@ import (

globalhubv1alpha4 "github.com/stolostron/multicluster-global-hub/operator/apis/v1alpha4"
"github.com/stolostron/multicluster-global-hub/operator/pkg/config"
operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants"
"github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/addon/certificates"
"github.com/stolostron/multicluster-global-hub/operator/pkg/utils"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
Expand Down
Loading
Loading