diff --git a/cmd/kubectl-k8ssandra/register/command.go b/cmd/kubectl-k8ssandra/register/command.go index 95de8a5..358c8d6 100644 --- a/cmd/kubectl-k8ssandra/register/command.go +++ b/cmd/kubectl-k8ssandra/register/command.go @@ -1,6 +1,7 @@ package register import ( + "errors" "fmt" "github.com/charmbracelet/log" @@ -13,7 +14,7 @@ var RegisterClusterCmd = &cobra.Command{ Use: "register [flags]", Short: "register a data plane into the control plane.", Long: `register creates a ServiceAccount on a source cluster, copies its credentials and then creates a secret containing them on the destination cluster. It then also creates a ClientConfig on the destination cluster to reference the secret.`, - Run: entrypoint, + RunE: entrypoint, } func SetupRegisterClusterCmd(cmd *cobra.Command, streams genericclioptions.IOStreams) { @@ -40,23 +41,24 @@ func SetupRegisterClusterCmd(cmd *cobra.Command, streams genericclioptions.IOStr cmd.AddCommand(RegisterClusterCmd) } -func entrypoint(cmd *cobra.Command, args []string) { +func entrypoint(cmd *cobra.Command, args []string) error { executor := NewRegistrationExecutorFromRegisterClusterCmd(*cmd) + // TODO What is this magic number 30? for i := 0; i < 30; i++ { - res := executor.RegisterCluster() - switch v := res.(type) { - case RetryableError: - log.Info("Registration continuing", "msg", v.Error()) + if err := executor.RegisterCluster(); err != nil { + if errors.Is(err, NonRecoverableError{}) { + log.Error(fmt.Sprintf("Registration failed: %s", err.Error())) + return err + } + log.Info("Registration still in progress", "msg", err.Error()) continue - case nil: - log.Info("Registration completed successfully") - return - case NonRecoverableError: - panic(fmt.Sprintf("Registration failed: %s", v.Error())) } + log.Info("Registration completed successfully") + return nil } - fmt.Println("Registration failed - retries exceeded") + log.Error("Registration failed - retries exceeded") + return nil } func NewRegistrationExecutorFromRegisterClusterCmd(cmd cobra.Command) *RegistrationExecutor { diff --git a/cmd/kubectl-k8ssandra/register/errors.go b/cmd/kubectl-k8ssandra/register/errors.go index 1a86f87..7f8280e 100644 --- a/cmd/kubectl-k8ssandra/register/errors.go +++ b/cmd/kubectl-k8ssandra/register/errors.go @@ -1,11 +1,7 @@ package register -type RetryableError struct { - Message string -} - -func (e RetryableError) Error() string { - return e.Message +func NonRecoverable(err string) NonRecoverableError { + return NonRecoverableError{Message: err} } type NonRecoverableError struct { diff --git a/cmd/kubectl-k8ssandra/register/register_test.go b/cmd/kubectl-k8ssandra/register/register_test.go index 56f1cd8..08f48c7 100644 --- a/cmd/kubectl-k8ssandra/register/register_test.go +++ b/cmd/kubectl-k8ssandra/register/register_test.go @@ -2,6 +2,7 @@ package register import ( "context" + "errors" "fmt" "os" "testing" @@ -25,15 +26,6 @@ func TestRegister(t *testing.T) { testDir, err := os.MkdirTemp("", "k8ssandra-operator-test-****") require.NoError(err) - // buildDir := filepath.Join(envtest.RootDir(), "build") - // testDir := filepath.Join(buildDir, time.Now()) - - // if _, err := os.Stat(testDir); os.IsNotExist(err) { - // err := os.MkdirAll(testDir, os.ModePerm) - // require.NoError(err) - // } else if err != nil { - // require.NoError(err) - // } t.Cleanup(func() { require.NoError(os.RemoveAll(testDir)) }) @@ -73,19 +65,17 @@ func TestRegister(t *testing.T) { ctx := context.Background() require.Eventually(func() bool { - res := ex.RegisterCluster() - switch v := res.(type) { - case RetryableError: - if res.Error() == "no secret found for service account k8ssandra-operator" { + if err := ex.RegisterCluster(); err != nil { + if errors.Is(err, NonRecoverableError{}) { + require.FailNow(fmt.Sprintf("Registration failed: %s", err.Error())) + } + if err.Error() == "no secret found for service account k8ssandra-operator" { return true } - case nil: - return true - case NonRecoverableError: - panic(fmt.Sprintf("Registration failed: %s", v.Error())) + return false } - return false - }, time.Second*30, time.Second*5) + return true + }, time.Second*5, time.Millisecond*100) // This relies on a controller that is not running in the envtest. @@ -115,9 +105,8 @@ func TestRegister(t *testing.T) { // Continue reconciliation require.Eventually(func() bool { - res := ex.RegisterCluster() - return res == nil - }, time.Second*300, time.Second*1) + return ex.RegisterCluster() == nil + }, time.Second*3, time.Millisecond*100) if err := configapi.AddToScheme(client2.Scheme()); err != nil { require.NoError(err) @@ -138,7 +127,7 @@ func TestRegister(t *testing.T) { return false } return err == nil - }, time.Second*60, time.Second*5) + }, time.Second*6, time.Millisecond*100) destKubeconfig := ClientConfigFromSecret(destSecret) require.Equal( diff --git a/cmd/kubectl-k8ssandra/register/registration.go b/cmd/kubectl-k8ssandra/register/registration.go index 71f68a6..164125c 100644 --- a/cmd/kubectl-k8ssandra/register/registration.go +++ b/cmd/kubectl-k8ssandra/register/registration.go @@ -56,32 +56,37 @@ func (e *RegistrationExecutor) RegisterCluster() error { registration.GetKubeconfigFileLocation(e.SourceKubeconfig), e.SourceContext, registration.GetKubeconfigFileLocation(e.DestKubeconfig), e.DestContext, ) + if e.SourceContext == e.DestContext && e.SourceKubeconfig == e.DestKubeconfig { - return NonRecoverableError{Message: "source and destination context and kubeconfig are the same, you should not register the same cluster to itself. Reference it by leaving the k8sContext field blank instead"} + return NonRecoverable("source and destination context and kubeconfig are the same, you should not register the same cluster to itself. Reference it by leaving the k8sContext field blank instead") } + srcClient, err := registration.GetClient(e.SourceKubeconfig, e.SourceContext) if err != nil { - return RetryableError{Message: err.Error()} + return err } + destClient, err := registration.GetClient(e.DestKubeconfig, e.DestContext) if err != nil { - return RetryableError{Message: err.Error()} + return err } + // Get ServiceAccount serviceAccount := &corev1.ServiceAccount{} if err := srcClient.Get(e.Context, client.ObjectKey{Name: e.ServiceAccount, Namespace: e.SourceNamespace}, serviceAccount); err != nil { if apierrors.IsNotFound(err) { if err := srcClient.Create(e.Context, getDefaultServiceAccount(e.ServiceAccount, e.SourceNamespace)); err != nil { - return RetryableError{Message: err.Error()} + return err } } - return RetryableError{Message: err.Error()} + return err } // Get a secret in this namespace which holds the service account token secretsList := &corev1.SecretList{} if err := srcClient.List(e.Context, secretsList, client.InNamespace(e.SourceNamespace)); err != nil { - return RetryableError{Message: err.Error()} + return err } + var secret *corev1.Secret for _, s := range secretsList.Items { if s.Annotations["kubernetes.io/service-account.name"] == e.ServiceAccount && s.Type == corev1.SecretTypeServiceAccountToken { @@ -89,26 +94,28 @@ func (e *RegistrationExecutor) RegisterCluster() error { break } } + if secret == nil { secret = getDefaultSecret(e.SourceNamespace, e.ServiceAccount) if err := srcClient.Create(e.Context, secret); err != nil { - return RetryableError{Message: err.Error()} + return err } - return RetryableError{Message: fmt.Sprintf("no secret found for service account %s", e.ServiceAccount)} + return fmt.Errorf("no secret found for service account %s", e.ServiceAccount) } // Create Secret on destination cluster host, err := registration.KubeconfigToHost(e.SourceKubeconfig, e.SourceContext) if err != nil { - return RetryableError{Message: err.Error()} + return err } saConfig, err := registration.TokenToKubeconfig(*secret, host, e.DestinationName) if err != nil { - return RetryableError{fmt.Sprintf("error converting token to kubeconfig: %s, secret: %#v", err.Error(), secret)} + return fmt.Errorf("error converting token to kubeconfig: %s, secret: %#v", err.Error(), secret) } + secretData, err := clientcmd.Write(saConfig) if err != nil { - return RetryableError{Message: err.Error()} + return err } destSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -121,13 +128,14 @@ func (e *RegistrationExecutor) RegisterCluster() error { }, } if err := destClient.Create(e.Context, &destSecret); err != nil && !errors.IsAlreadyExists(err) { - return RetryableError{fmt.Sprintf("error creating secret. err: %s sa %s", err, e.ServiceAccount)} + return fmt.Errorf("error creating secret. err: %s sa %s", err, e.ServiceAccount) } // Create ClientConfig on destination cluster if err := configapi.AddToScheme(destClient.Scheme()); err != nil { - return RetryableError{Message: err.Error()} + return err } + destClientConfig := configapi.ClientConfig{ ObjectMeta: metav1.ObjectMeta{ Name: e.DestinationName, @@ -141,7 +149,8 @@ func (e *RegistrationExecutor) RegisterCluster() error { }, } if err := destClient.Create(e.Context, &destClientConfig); err != nil && !errors.IsAlreadyExists(err) { - return RetryableError{Message: err.Error()} + return err } + return nil } diff --git a/pkg/helmutil/crds_test.go b/pkg/helmutil/crds_test.go index 017767b..ef86021 100644 --- a/pkg/helmutil/crds_test.go +++ b/pkg/helmutil/crds_test.go @@ -66,7 +66,7 @@ func TestUpgradingCRDs(t *testing.T) { require.NoError(kubeClient.Get(context.TODO(), client.ObjectKey{Name: cassDCCRD.GetName()}, cassDCCRD)) newver := cassDCCRD.GetResourceVersion() return newver != ver - }, time.Minute*1, time.Second*5) + }, time.Minute*1, time.Millisecond*100) descRunsAsCassandra = cassDCCRD.Spec.Versions[0].DeepCopy().Schema.OpenAPIV3Schema.Properties["spec"].Properties["dockerImageRunsAsCassandra"].Description require.True(strings.HasPrefix(descRunsAsCassandra, "DEPRECATED"))