Skip to content

Commit

Permalink
✨ Cluster decorator interface (#759)
Browse files Browse the repository at this point in the history
* Add cluster decorator interface in register

And refactor creating to controller to call decorators

Signed-off-by: Jian Qiu <[email protected]>

* Add aws annotations to ManagedCluster using Decorator

Signed-off-by: Gaurav Jaswal <[email protected]>

* Addressing review comments

Signed-off-by: Gaurav Jaswal <[email protected]>

---------

Signed-off-by: Jian Qiu <[email protected]>
Signed-off-by: Gaurav Jaswal <[email protected]>
Co-authored-by: Jian Qiu <[email protected]>
  • Loading branch information
jaswalkiranavtar and qiujian16 authored Dec 11, 2024
1 parent 75c9b8a commit b170f3a
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 77 deletions.
27 changes: 23 additions & 4 deletions pkg/registration/register/aws_irsa/aws_irsa.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"

clusterv1 "open-cluster-management.io/api/cluster/v1"
operatorv1 "open-cluster-management.io/api/operator/v1"

"open-cluster-management.io/ocm/pkg/registration/register"
)

Expand All @@ -22,11 +25,15 @@ const (
// TLSKeyFile is the name of tls key file in kubeconfigSecret
TLSKeyFile = "tls.key"
// TLSCertFile is the name of the tls cert file in kubeconfigSecret
TLSCertFile = "tls.crt"
TLSCertFile = "tls.crt"
ManagedClusterArn = "managed-cluster-arn"
ManagedClusterIAMRoleSuffix = "managed-cluster-iam-role-suffix"
)

type AWSIRSADriver struct {
name string
name string
managedClusterArn string
managedClusterRoleSuffix string
}

func (c *AWSIRSADriver) Process(
Expand Down Expand Up @@ -95,6 +102,18 @@ func (c *AWSIRSADriver) IsHubKubeConfigValid(ctx context.Context, secretOption r
return true, nil
}

func NewAWSIRSADriver() register.RegisterDriver {
return &AWSIRSADriver{}
func (c *AWSIRSADriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster {
if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
cluster.Annotations[operatorv1.ClusterAnnotationsKeyPrefix+"/"+ManagedClusterArn] = c.managedClusterArn
cluster.Annotations[operatorv1.ClusterAnnotationsKeyPrefix+"/"+ManagedClusterIAMRoleSuffix] = c.managedClusterRoleSuffix
return cluster
}

func NewAWSIRSADriver(managedClusterArn string, managedClusterRoleSuffix string) register.RegisterDriver {
return &AWSIRSADriver{
managedClusterArn: managedClusterArn,
managedClusterRoleSuffix: managedClusterRoleSuffix,
}
}
2 changes: 1 addition & 1 deletion pkg/registration/register/aws_irsa/aws_irsa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestIsHubKubeConfigValidFunc(t *testing.T) {
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
driver := NewAWSIRSADriver()
driver := NewAWSIRSADriver("", "")
secretOption := register.SecretOption{
ClusterName: c.clusterName,
AgentName: c.agentName,
Expand Down
6 changes: 6 additions & 0 deletions pkg/registration/register/csr/csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"k8s.io/client-go/util/keyutil"
"k8s.io/klog/v2"

clusterv1 "open-cluster-management.io/api/cluster/v1"

"open-cluster-management.io/ocm/pkg/registration/register"
)

Expand Down Expand Up @@ -266,6 +268,10 @@ func (c *CSRDriver) IsHubKubeConfigValid(ctx context.Context, secretOption regis
return isCertificateValid(logger, certData, nil)
}

func (c *CSRDriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster {
return cluster
}

func NewCSRDriver() register.RegisterDriver {
return &CSRDriver{}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/registration/register/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type RegisterDriver interface {
// InformerHandler returns informer of the related object. If no object needs to be watched, the func could
// return nil, nil.
InformerHandler(option any) (cache.SharedIndexInformer, factory.EventFilterFunc)

// ManagedClusterDecorator is to change managed cluster metadata or spec during registration process.
ManagedClusterDecorator(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster
}

// Approvers is the inteface that each driver should implement on hub side. The hub controller will use this driver
Expand Down
8 changes: 7 additions & 1 deletion pkg/registration/register/secret_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

clusterv1 "open-cluster-management.io/api/cluster/v1"

testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)
Expand Down Expand Up @@ -133,7 +135,7 @@ func TestSync(t *testing.T) {
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
syncCtx := testingcommon.NewFakeSyncContext(t, "test")
kubeClient := kubefake.NewSimpleClientset(c.secrets...)
kubeClient := kubefake.NewClientset(c.secrets...)
c.option.ManagementCoreClient = kubeClient.CoreV1()
informerFactory := informers.NewSharedInformerFactory(kubeClient, 10*time.Minute)
c.option.ManagementSecretInformer = informerFactory.Core().V1().Secrets().Informer()
Expand Down Expand Up @@ -195,3 +197,7 @@ func (f *fakeDriver) Process(
func (f *fakeDriver) InformerHandler(_ any) (cache.SharedIndexInformer, factory.EventFilterFunc) {
return nil, nil
}

func (f *fakeDriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster {
return cluster
}
103 changes: 55 additions & 48 deletions pkg/registration/spoke/registration/creating_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,26 @@ var (
CreatingControllerSyncInterval = 60 * time.Minute
)

type ManagedClusterDecorator func(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster

// managedClusterCreatingController creates a ManagedCluster on hub cluster during the spoke agent bootstrap phase
type managedClusterCreatingController struct {
clusterName string
spokeExternalServerURLs []string
spokeCABundle []byte
clusterAnnotations map[string]string
hubClusterClient clientset.Interface
clusterName string
clusterDecorators []ManagedClusterDecorator
hubClusterClient clientset.Interface
}

// NewManagedClusterCreatingController creates a new managedClusterCreatingController on the managed cluster.
func NewManagedClusterCreatingController(
clusterName string, spokeExternalServerURLs []string, annotations map[string]string,
spokeCABundle []byte,
clusterName string,
decorators []ManagedClusterDecorator,
hubClusterClient clientset.Interface,
recorder events.Recorder) factory.Controller {

c := &managedClusterCreatingController{
clusterName: clusterName,
spokeExternalServerURLs: spokeExternalServerURLs,
spokeCABundle: spokeCABundle,
clusterAnnotations: commonhelpers.FilterClusterAnnotations(annotations),
hubClusterClient: hubClusterClient,
clusterName: clusterName,
hubClusterClient: hubClusterClient,
clusterDecorators: decorators,
}

return factory.New().
Expand All @@ -69,20 +67,12 @@ func (c *managedClusterCreatingController) sync(ctx context.Context, syncCtx fac
if errors.IsNotFound(err) {
managedCluster := &clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: c.clusterName,
Annotations: c.clusterAnnotations,
Name: c.clusterName,
},
}

if len(c.spokeExternalServerURLs) != 0 {
var managedClusterClientConfigs []clusterv1.ClientConfig
for _, serverURL := range c.spokeExternalServerURLs {
managedClusterClientConfigs = append(managedClusterClientConfigs, clusterv1.ClientConfig{
URL: serverURL,
CABundle: c.spokeCABundle,
})
}
managedCluster.Spec.ManagedClusterClientConfigs = managedClusterClientConfigs
for _, decorator := range c.clusterDecorators {
managedCluster = decorator(managedCluster)
}

_, err = c.hubClusterClient.ClusterV1().ManagedClusters().Create(ctx, managedCluster, metav1.CreateOptions{})
Expand All @@ -94,37 +84,17 @@ func (c *managedClusterCreatingController) sync(ctx context.Context, syncCtx fac
return nil
}

// do not update ManagedClusterClientConfigs in ManagedCluster if spokeExternalServerURLs is empty
if len(c.spokeExternalServerURLs) == 0 {
return nil
managedCluster := existingCluster.DeepCopy()
for _, decorator := range c.clusterDecorators {
managedCluster = decorator(managedCluster)
}

// merge ClientConfig
managedClusterClientConfigs := existingCluster.Spec.ManagedClusterClientConfigs
for _, serverURL := range c.spokeExternalServerURLs {
isIncludeByExisting := false
for _, existingClientConfig := range existingCluster.Spec.ManagedClusterClientConfigs {
if serverURL == existingClientConfig.URL {
isIncludeByExisting = true
break
}
}

if !isIncludeByExisting {
managedClusterClientConfigs = append(managedClusterClientConfigs, clusterv1.ClientConfig{
URL: serverURL,
CABundle: c.spokeCABundle,
})
}
}
if len(existingCluster.Spec.ManagedClusterClientConfigs) == len(managedClusterClientConfigs) {
if len(existingCluster.Spec.ManagedClusterClientConfigs) == len(managedCluster.Spec.ManagedClusterClientConfigs) {
return nil
}

// update ManagedClusterClientConfigs in ManagedCluster
clusterCopy := existingCluster.DeepCopy()
clusterCopy.Spec.ManagedClusterClientConfigs = managedClusterClientConfigs
_, err = c.hubClusterClient.ClusterV1().ManagedClusters().Update(ctx, clusterCopy, metav1.UpdateOptions{})
_, err = c.hubClusterClient.ClusterV1().ManagedClusters().Update(ctx, managedCluster, metav1.UpdateOptions{})
// ManagedClusterClientConfigs in ManagedCluster is only allowed updated during bootstrap.
// After bootstrap secret expired, an unauthorized error will be got, skip it
if skipUnauthorizedError(err) != nil {
Expand All @@ -141,3 +111,40 @@ func skipUnauthorizedError(err error) error {

return err
}

func AnnotationDecorator(annotations map[string]string) ManagedClusterDecorator {
return func(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster {
filteredAnnotations := commonhelpers.FilterClusterAnnotations(annotations)
if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
for key, value := range filteredAnnotations {
cluster.Annotations[key] = value
}
return cluster
}
}

// ClientConfigDecorator merge ClientConfig
func ClientConfigDecorator(externalServerURLs []string, caBundle []byte) ManagedClusterDecorator {
return func(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster {
for _, serverURL := range externalServerURLs {
isIncludeByExisting := false
for _, existingClientConfig := range cluster.Spec.ManagedClusterClientConfigs {
if serverURL == existingClientConfig.URL {
isIncludeByExisting = true
break
}
}

if !isIncludeByExisting {
cluster.Spec.ManagedClusterClientConfigs = append(
cluster.Spec.ManagedClusterClientConfigs, clusterv1.ClientConfig{
URL: serverURL,
CABundle: caBundle,
})
}
}
return cluster
}
}
13 changes: 7 additions & 6 deletions pkg/registration/spoke/registration/creating_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ func TestCreateSpokeCluster(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...)
ctrl := managedClusterCreatingController{
clusterName: testinghelpers.TestManagedClusterName,
spokeExternalServerURLs: []string{testSpokeExternalServerUrl},
spokeCABundle: []byte("testcabundle"),
hubClusterClient: clusterClient,
clusterAnnotations: map[string]string{
"agent.open-cluster-management.io/test": "true",
clusterName: testinghelpers.TestManagedClusterName,
clusterDecorators: []ManagedClusterDecorator{
AnnotationDecorator(map[string]string{
"agent.open-cluster-management.io/test": "true",
}),
ClientConfigDecorator([]string{testSpokeExternalServerUrl}, []byte("testcabundle")),
},
hubClusterClient: clusterClient,
}

syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, ""))
Expand Down
26 changes: 9 additions & 17 deletions pkg/registration/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"
operatorv1 "open-cluster-management.io/api/operator/v1"

"open-cluster-management.io/ocm/pkg/common/helpers"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
Expand Down Expand Up @@ -191,20 +190,9 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,

// initiate registration driver
var registerDriver register.RegisterDriver
if o.registrationOption.RegistrationAuth == AwsIrsaAuthType {
// TODO: may consider add additional validations
if o.registrationOption.HubClusterArn != "" {
registerDriver = awsIrsa.NewAWSIRSADriver()
if o.registrationOption.ClusterAnnotations == nil {
o.registrationOption.ClusterAnnotations = map[string]string{}
}
o.registrationOption.ClusterAnnotations[operatorv1.ClusterAnnotationsKeyPrefix+"/managed-cluster-arn"] = o.registrationOption.ManagedClusterArn
o.registrationOption.ClusterAnnotations[operatorv1.ClusterAnnotationsKeyPrefix+"/managed-cluster-iam-role-suffix"] =
o.registrationOption.ManagedClusterRoleSuffix

} else {
panic("A valid EKS Hub Cluster ARN is required with awsirsa based authentication")
}
var registrationOption = o.registrationOption
if registrationOption.RegistrationAuth == AwsIrsaAuthType {
registerDriver = awsIrsa.NewAWSIRSADriver(o.registrationOption.ManagedClusterArn, o.registrationOption.ManagedClusterRoleSuffix)
} else {
registerDriver = csr.NewCSRDriver()
}
Expand Down Expand Up @@ -254,8 +242,12 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,

// start a SpokeClusterCreatingController to make sure there is a spoke cluster on hub cluster
spokeClusterCreatingController := registration.NewManagedClusterCreatingController(
o.agentOptions.SpokeClusterName, o.registrationOption.SpokeExternalServerURLs, o.registrationOption.ClusterAnnotations,
spokeClusterCABundle,
o.agentOptions.SpokeClusterName,
[]registration.ManagedClusterDecorator{
registration.AnnotationDecorator(o.registrationOption.ClusterAnnotations),
registration.ClientConfigDecorator(o.registrationOption.SpokeExternalServerURLs, spokeClusterCABundle),
o.driver.ManagedClusterDecorator,
},
bootstrapClusterClient,
recorder,
)
Expand Down
14 changes: 14 additions & 0 deletions pkg/registration/spoke/spokeagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func init() {
func TestValidate(t *testing.T) {
defaultCompletedOptions := NewSpokeAgentOptions()
defaultCompletedOptions.BootstrapKubeconfig = "/spoke/bootstrap/kubeconfig"
awsCompletedOptionsHubArnMissing := *defaultCompletedOptions
awsCompletedOptionsHubArnMissing.RegistrationAuth = AwsIrsaAuthType
awsDefaultCompletedOptions := awsCompletedOptionsHubArnMissing
awsDefaultCompletedOptions.HubClusterArn = "arn:aws:eks:us-west-2:123456789012:cluster/hub-cluster1"

cases := []struct {
name string
Expand Down Expand Up @@ -78,6 +82,16 @@ func TestValidate(t *testing.T) {
options: defaultCompletedOptions,
expectedErr: "",
},
{
name: "default completed options for aws flow",
options: &awsDefaultCompletedOptions,
expectedErr: "",
},
{
name: "default completed options without HubClusterArn for aws flow",
options: &awsCompletedOptionsHubArnMissing,
expectedErr: "EksHubClusterArn cannot be empty if RegistrationAuth is awsirsa",
},
{
name: "default completed options",
options: &SpokeAgentOptions{
Expand Down
Loading

0 comments on commit b170f3a

Please sign in to comment.