Skip to content

Commit

Permalink
Build all hub controller as a single controller
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Qiu <[email protected]>
  • Loading branch information
qiujian16 committed Nov 4, 2024
1 parent 26f9e74 commit 37c23ec
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 6 deletions.
3 changes: 2 additions & 1 deletion cmd/registration-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
func newNucleusCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "registration-operator",
Short: "Nucleus Operator",
Short: "Registration Operator",
Run: func(cmd *cobra.Command, args []string) {
_ = cmd.Help()
os.Exit(1)
Expand All @@ -51,6 +51,7 @@ func newNucleusCommand() *cobra.Command {
}

cmd.AddCommand(hub.NewHubOperatorCmd())
cmd.AddCommand(hub.NewHubManagerCmd())
cmd.AddCommand(spoke.NewKlusterletOperatorCmd())
cmd.AddCommand(spoke.NewKlusterletAgentCmd())

Expand Down
27 changes: 27 additions & 0 deletions pkg/cmd/hub/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import (
"context"

"github.com/spf13/cobra"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"

ocmfeature "open-cluster-management.io/api/feature"

commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/operator/operators/clustermanager"
singletonhub "open-cluster-management.io/ocm/pkg/singleton/hub"
"open-cluster-management.io/ocm/pkg/version"
)

Expand All @@ -30,3 +35,25 @@ func NewHubOperatorCmd() *cobra.Command {
opts.AddFlags(flags)
return cmd
}

// NewHubManagerCmd is to start the singleton manager including registration/work/placement/addon
func NewHubManagerCmd() *cobra.Command {
opts := commonoptions.NewOptions()
hubOpts := singletonhub.NewOption()

cmdConfig := opts.
NewControllerCommandConfig("ocm-controller", version.Get(), hubOpts.RunControllerManager)
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd.Use = "manager"
cmd.Short = "Start the ocm manager"

flags := cmd.Flags()

opts.AddFlags(flags)

utilruntime.Must(features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubWorkFeatureGates))
utilruntime.Must(features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubRegistrationFeatureGates))
utilruntime.Must(features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubAddonManagerFeatureGates))
features.HubMutableFeatureGate.AddFlag(flags)
return cmd
}
10 changes: 5 additions & 5 deletions pkg/registration/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned"
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions"
workclient "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"

Expand Down Expand Up @@ -86,7 +86,7 @@ func (m *HubManagerOptions) RunControllerManager(ctx context.Context, controller
return err
}

workClient, err := workv1client.NewForConfig(controllerContext.KubeConfig)
workClient, err := workclient.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
Expand All @@ -98,7 +98,7 @@ func (m *HubManagerOptions) RunControllerManager(ctx context.Context, controller

clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute)
clusterProfileInformers := cpinformerv1alpha1.NewSharedInformerFactory(clusterProfileClient, 30*time.Minute)
workInformers := workv1informers.NewSharedInformerFactory(workClient, 30*time.Minute)
workInformers := workinformers.NewSharedInformerFactory(workClient, 30*time.Minute)
kubeInfomers := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute, kubeinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
// Note all kube resources managed by registration should have the cluster label, and should not have
Expand Down Expand Up @@ -137,7 +137,7 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
kubeInformers kubeinformers.SharedInformerFactory,
clusterInformers clusterv1informers.SharedInformerFactory,
clusterProfileInformers cpinformerv1alpha1.SharedInformerFactory,
workInformers workv1informers.SharedInformerFactory,
workInformers workinformers.SharedInformerFactory,
addOnInformers addoninformers.SharedInformerFactory,
) error {
csrApprover, err := csr.NewCSRApprover(kubeClient, kubeInformers, m.ClusterAutoApprovalUsers, controllerContext.EventRecorder)
Expand Down
215 changes: 215 additions & 0 deletions pkg/singleton/hub/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package hub

import (
"context"
"time"

"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
cpclientset "sigs.k8s.io/cluster-inventory-api/client/clientset/versioned"
cpinformerv1alpha1 "sigs.k8s.io/cluster-inventory-api/client/informers/externalversions"

addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"

"open-cluster-management.io/ocm/pkg/addon"
"open-cluster-management.io/ocm/pkg/features"
placementcontrollers "open-cluster-management.io/ocm/pkg/placement/controllers"
registrationhub "open-cluster-management.io/ocm/pkg/registration/hub"
workhub "open-cluster-management.io/ocm/pkg/work/hub"
)

const sourceID = "ocm-controller"

type Option struct {
RegistrationOption *registrationhub.HubManagerOptions
WorkOption *workhub.WorkHubManagerOptions
}

func NewOption() *Option {
return &Option{
RegistrationOption: registrationhub.NewHubManagerOptions(),
WorkOption: workhub.NewWorkHubManagerOptions(),
}
}

func (o *Option) AddFlags(fs *pflag.FlagSet) {
o.RegistrationOption.AddFlags(fs)
o.WorkOption.AddFlags(fs)
}

func (o *Option) RunControllerManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
kubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

metadataClient, err := metadata.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterClient, err := clusterv1client.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterProfileClient, err := cpclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

addOnClient, err := addonclient.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute)
clusterProfileInformers := cpinformerv1alpha1.NewSharedInformerFactory(clusterProfileClient, 30*time.Minute)
kubeInfomers := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute, kubeinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
// Note all kube resources managed by registration should have the cluster label, and should not have
// the addon label.
selector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: clusterv1.ClusterNameLabelKey,
Operator: metav1.LabelSelectorOpExists,
},
{
Key: addonv1alpha1.AddonLabelKey,
Operator: metav1.LabelSelectorOpDoesNotExist,
},
},
}
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
}))
addOnInformers := addoninformers.NewSharedInformerFactory(addOnClient, 30*time.Minute)

var workClient workclientset.Interface
var watcherStore *store.SourceInformerWatcherStore

if o.WorkOption.WorkDriver == "kube" {
config := controllerContext.KubeConfig
if o.WorkOption.WorkDriverConfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", o.WorkOption.WorkDriverConfig)
if err != nil {
return err
}
}

workClient, err = workclientset.NewForConfig(config)
if err != nil {
return err
}
} else {
// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.

watcherStore = store.NewSourceInformerWatcherStore(ctx)

_, config, err := generic.NewConfigLoader(o.WorkOption.WorkDriver, o.WorkOption.WorkDriverConfig).
LoadConfig()
if err != nil {
return err
}

clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(o.WorkOption.CloudEventsClientID).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(ctx)
if err != nil {
return err
}

workClient = clientHolder.WorkInterface()
}

workInformers := workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute)
informer := workInformers.Work().V1().ManifestWorks()

// For cloudevents work client, we use the informer store as the client store
if watcherStore != nil {
watcherStore.SetStore(informer.Informer().GetStore())
}

// start registration component
go func() {
err := o.RegistrationOption.RunControllerManagerWithInformers(
ctx, controllerContext,
kubeClient, metadataClient, clusterClient, clusterProfileClient, addOnClient,
kubeInfomers, clusterInformers, clusterProfileInformers, workInformers, addOnInformers,
)
if err != nil {
klog.Fatal(err)
}
}()

// start placement component
go func() {
err := placementcontrollers.RunControllerManagerWithInformers(
ctx, controllerContext, kubeClient, clusterClient, clusterInformers)
if err != nil {
klog.Fatal(err)
}
}()

// start work component
if features.HubMutableFeatureGate.Enabled(ocmfeature.ManifestWorkReplicaSet) {
// build a hub work client for ManifestWorkReplicaSets
replicaSetsClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
go func() {
err := workhub.RunControllerManagerWithInformers(
ctx, controllerContext, replicaSetsClient, workClient, informer, clusterInformers)
if err != nil {
klog.Fatal(err)
}
}()
}

// start addon component
if features.HubMutableFeatureGate.Enabled(ocmfeature.AddonManagement) {
dynamicClient, err := dynamic.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 10*time.Minute)
go func() {
err := addon.RunControllerManagerWithInformers(
ctx, controllerContext, kubeClient, addOnClient, workClient,
clusterInformers, addOnInformers, workInformers, dynamicInformers,
)
if err != nil {
klog.Fatal(err)
}
}()
}

<-ctx.Done()
return nil
}

0 comments on commit 37c23ec

Please sign in to comment.