Skip to content

Commit

Permalink
fix: namespace migration via export (#135) (#161)
Browse files Browse the repository at this point in the history
* fix: perform namespace File migration via export
  • Loading branch information
Mryashbhardwaj authored Oct 6, 2023
1 parent a822b3b commit 4913b5e
Showing 1 changed file with 91 additions and 43 deletions.
134 changes: 91 additions & 43 deletions client/cmd/job/change_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package job

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -15,10 +16,10 @@ import (
"github.com/goto/optimus/client/cmd/internal"
"github.com/goto/optimus/client/cmd/internal/connection"
"github.com/goto/optimus/client/cmd/internal/logger"
"github.com/goto/optimus/client/local"
"github.com/goto/optimus/client/local/model"
"github.com/goto/optimus/client/local/specio"
"github.com/goto/optimus/config"
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/errors"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
)

Expand All @@ -30,6 +31,8 @@ type changeNamespaceCommand struct {
logger log.Logger
connection connection.Connection

writer local.SpecReadWriter[*model.JobSpec]

configFilePath string
clientConfig *config.ClientConfig

Expand Down Expand Up @@ -73,6 +76,12 @@ func (c *changeNamespaceCommand) injectFlags(cmd *cobra.Command) {
}

func (c *changeNamespaceCommand) PreRunE(_ *cobra.Command, _ []string) error {
readWriter, err := specio.NewJobSpecReadWriter(afero.NewOsFs())
if err != nil {
c.logger.Error(err.Error())
}
c.writer = readWriter

// Load mandatory config
conf, err := config.LoadClientConfig(c.configFilePath)
if err != nil {
Expand Down Expand Up @@ -118,64 +127,103 @@ func (c *changeNamespaceCommand) sendChangeNamespaceRequest(jobName string) erro
return err
}

func (c *changeNamespaceCommand) PostRunE(_ *cobra.Command, args []string) error {
c.logger.Info("\n[info] Moving job in filesystem")
jobName := args[0]

oldNamespaceConfig, err := c.getNamespaceConfig(c.oldNamespaceName)
func (c *changeNamespaceCommand) downloadJobSpecFile(fs afero.Fs, jobName, newJobPath string) error {
conn, err := c.connection.Create(c.host)
if err != nil {
c.logger.Error(fmt.Sprintf("[error] old namespace unregistered in filesystem, err: %s", err.Error()))
return nil
return err
}
defer conn.Close()

jobSpecificationServiceClient := pb.NewJobSpecificationServiceClient(conn)

jobSpecReadWriter, err := specio.NewJobSpecReadWriter(afero.NewOsFs())
ctx, cancelFunc := context.WithTimeout(context.Background(), fetchJobTimeout)
defer cancelFunc()

response, err := jobSpecificationServiceClient.GetJobSpecifications(ctx, &pb.GetJobSpecificationsRequest{
ProjectName: c.project,
NamespaceName: c.newNamespaceName,
JobName: jobName,
})
if err != nil {
c.logger.Error(fmt.Sprintf("[error] could not instantiate Spec Readed, err: %s", err.Error()))
return nil
return err
}

if len(response.JobSpecificationResponses) == 0 {
return errors.New("job is not found")
}
jobSpec := model.ToJobSpec(response.JobSpecificationResponses[0].Job)

jobSpec, err := jobSpecReadWriter.ReadByName(oldNamespaceConfig.Job.Path, jobName)
c.logger.Info(fmt.Sprintf("[info] creating job directry: %s", newJobPath))
err = fs.MkdirAll(filepath.Dir(newJobPath), os.ModePerm)
if err != nil {
c.logger.Error(fmt.Sprintf("[error] unable to find job in old namespace directory, err: %s", err.Error()))
return nil
return err
}
return c.writer.Write(newJobPath, jobSpec)
}

fs := afero.NewOsFs()
newNamespaceConfig, err := c.getNamespaceConfig(c.newNamespaceName)
if err != nil || newNamespaceConfig.Job.Path == "" {
c.logger.Warn("[warn] new namespace not recognised for jobs")
c.logger.Warn("[info] run `optimus job export` on the new namespace repo, to fetch the newly moved job.")

c.logger.Warn("[info] removing job from old namespace")
err = fs.RemoveAll(jobSpec.Path)
if err != nil {
c.logger.Error(fmt.Sprintf("[error] unable to remove job from old namespace , err: %s", err.Error()))
c.logger.Warn("[info] consider deleting source files manually if they exist")
return nil
}
c.logger.Warn("[OK] removed job spec from current namespace directory")
return nil
func (c *changeNamespaceCommand) getOldJobPath(jobName string) (string, error) {
oldNamespaceConfig, err := c.getNamespaceConfig(c.oldNamespaceName)
if err != nil {
return "", fmt.Errorf("old namespace does not exist in filesystem, err: %w", err)
}
jobSpec, err := c.writer.ReadByName(oldNamespaceConfig.Job.Path, jobName)
if err != nil {
return "", fmt.Errorf("unable to find job in old namespace directory, err: %w", err)
}
return jobSpec.Path, nil
}

newJobPath := strings.Replace(jobSpec.Path, oldNamespaceConfig.Job.Path, newNamespaceConfig.Job.Path, 1)
func (c *changeNamespaceCommand) deleteJobFile(fs afero.Fs, jobName string) (string, error) {
oldJobPath, err := c.getOldJobPath(jobName)
if err != nil {
return oldJobPath, err
}
c.logger.Info("[info] removing job from old namespace")
err = fs.RemoveAll(oldJobPath)
if err != nil {
return oldJobPath, err
}
c.logger.Info("[OK] removed job spec from old namespace directory")
return oldJobPath, nil
}

c.logger.Info(fmt.Sprintf("\t* Old Path : '%s' \n\t* New Path : '%s' \n", jobSpec.Path, newJobPath))
func (c *changeNamespaceCommand) PostRunE(_ *cobra.Command, args []string) error {
c.logger.Info("\n[info] Moving job in filesystem")
jobName := args[0]
fs := afero.NewOsFs()

c.logger.Info(fmt.Sprintf("[info] creating job directry: %s", newJobPath))
oldJobPath, err := c.deleteJobFile(fs, jobName)
if err != nil {
c.logger.Error(fmt.Sprintf("[error] unable to remove job from old namespace , err: %s", err.Error()))
c.logger.Warn("[info] consider deleting source files from old namespace manually, if they exist")
}

err = fs.MkdirAll(filepath.Dir(newJobPath), os.ModePerm)
var newJobPath string
newNamespaceConfig, err := c.getNamespaceConfig(c.newNamespaceName)
if err != nil {
c.logger.Error(fmt.Sprintf("[error] unable to create path in the new namespace directory, err: %s", err.Error()))
c.logger.Warn("[warn] unable to move job from old namespace")
c.logger.Warn("[info] consider moving source files manually")
return nil
c.logger.Error("[error] new namespace not recognised for jobs: err: %s", err.Error())
}
if newNamespaceConfig.Job.Path == "" {
c.logger.Error("[error] namespace config does not have a defined jobs path")
}
if err != nil || newNamespaceConfig.Job.Path == "" {
c.logger.Warn("[info] register the new namespace and run \n\t`optimus job export -p %s -n %s -r %s `, to fetch the newly moved job.",
c.project, c.newNamespaceName, jobName)
return err
}

oldNamespaceConfig, nsConfigErr := c.getNamespaceConfig(c.oldNamespaceName)
if oldJobPath == "" || nsConfigErr != nil {
newJobPath = fmt.Sprintf("%s/%s", newNamespaceConfig.Job.Path, jobName)
} else {
newJobPath = strings.Replace(oldJobPath, oldNamespaceConfig.Job.Path, newNamespaceConfig.Job.Path, 1)
}

err = fs.Rename(jobSpec.Path, newJobPath)
err = c.downloadJobSpecFile(fs, jobName, newJobPath)
if err != nil {
c.logger.Error(fmt.Sprintf("[warn] unable to move job from old namespace, err: %s", err.Error()))
c.logger.Warn("[info] consider moving source files manually")
return nil
c.logger.Error(fmt.Sprintf("[error] unable to download job spec to new namespace directory, err: %s", err.Error()))
c.logger.Warn("[info] manually run \n\t`optimus job export -p %s -n %s -r %s `, to fetch the newly moved job.",
c.project, c.newNamespaceName, jobName)
}
c.logger.Info("[OK] Job moved successfully")
return nil
Expand All @@ -187,5 +235,5 @@ func (c *changeNamespaceCommand) getNamespaceConfig(namespaceName string) (*conf
return namespace, nil
}
}
return nil, errors.NotFound(tenant.EntityNamespace, "not recognised in config")
return nil, errors.New("not recognised in config")
}

0 comments on commit 4913b5e

Please sign in to comment.