Skip to content

Commit

Permalink
feat: add backup feature (#88)
Browse files Browse the repository at this point in the history
User can configure backup-related configurations in the project config yaml, for example:

datastore:
  path: bigquery
  type: bigquery
  backup:
    dataset: optimus_backup
    ttl: 30
    prefix: backup
another datastore can have different backup configurations model.

Backup via cli usage:

optimus backup resource --project [project] --namespace [namespace]
Adding backup list to show recent backups of a project.

optimus backup list --project [project]
  • Loading branch information
arinda-arif authored Oct 8, 2021
1 parent 407057b commit 4622182
Show file tree
Hide file tree
Showing 29 changed files with 4,908 additions and 747 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "ed46a61c267dc47e91d163fbd8f3f3367f272d23"
PROTON_COMMIT := "e3b3bee44b27c5cdfb9276ccda86af4a462614c3"

all: build

Expand Down
87 changes: 86 additions & 1 deletion api/handler/v1/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,16 @@ func (sv *RuntimeServiceServer) BackupDryRun(ctx context.Context, req *pb.Backup
jobSpecs = append(jobSpecs, downstreamSpecs...)
}

resourcesToBackup, err := sv.resourceSvc.BackupResourceDryRun(ctx, projectSpec, namespaceSpec, jobSpecs)
//should add config
backupRequest := models.BackupRequest{
ResourceName: req.ResourceName,
Project: projectSpec,
Namespace: namespaceSpec,
Description: req.Description,
IgnoreDownstream: req.IgnoreDownstream,
DryRun: true,
}
resourcesToBackup, err := sv.resourceSvc.BackupResourceDryRun(ctx, backupRequest, jobSpecs)
if err != nil {
return nil, status.Errorf(codes.Internal, "error while doing backup dry run: %v", err)
}
Expand All @@ -971,6 +980,82 @@ func (sv *RuntimeServiceServer) BackupDryRun(ctx context.Context, req *pb.Backup
}, nil
}

func (sv *RuntimeServiceServer) Backup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupResponse, error) {
projectSpec, err := sv.getProjectSpec(req.ProjectName)
if err != nil {
return nil, err
}

namespaceRepo := sv.namespaceRepoFactory.New(projectSpec)
namespaceSpec, err := namespaceRepo.GetByName(req.Namespace)
if err != nil {
return nil, status.Errorf(codes.NotFound, "%s: namespace %s not found", err.Error(), req.Namespace)
}

resourceSpec, err := sv.resourceSvc.ReadResource(ctx, namespaceSpec, req.DatastoreName, req.ResourceName)
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: failed to read resource %s", err.Error(), req.ResourceName)
}

var jobSpecs []models.JobSpec
jobSpec, err := sv.jobSvc.GetByDestination(projectSpec, resourceSpec.URN)
if err != nil {
return nil, status.Errorf(codes.Internal, "error while getting job: %v", err)
}
jobSpecs = append(jobSpecs, jobSpec)

if !req.IgnoreDownstream {
downstreamSpecs, err := sv.jobSvc.GetDownstream(ctx, projectSpec, jobSpec.Name)
if err != nil {
return nil, status.Errorf(codes.Internal, "error while getting job downstream: %v", err)
}
jobSpecs = append(jobSpecs, downstreamSpecs...)
}

backupRequest := models.BackupRequest{
ResourceName: req.ResourceName,
Project: projectSpec,
Namespace: namespaceSpec,
Description: req.Description,
IgnoreDownstream: req.IgnoreDownstream,
DryRun: false,
Config: req.Config,
}
results, err := sv.resourceSvc.BackupResource(ctx, backupRequest, jobSpecs)
if err != nil {
return nil, status.Errorf(codes.Internal, "error while doing backup: %v", err)
}

return &pb.BackupResponse{
Urn: results,
}, nil
}

func (sv *RuntimeServiceServer) ListBackups(ctx context.Context, req *pb.ListBackupsRequest) (*pb.ListBackupsResponse, error) {
projectSpec, err := sv.getProjectSpec(req.ProjectName)
if err != nil {
return nil, err
}

results, err := sv.resourceSvc.ListBackupResources(projectSpec, req.DatastoreName)
if err != nil {
return nil, status.Errorf(codes.Internal, "error while getting backup list: %v", err)
}

var backupList []*pb.BackupSpec
for _, result := range results {
backupList = append(backupList, &pb.BackupSpec{
Id: result.ID.String(),
ResourceName: result.Resource.Name,
CreatedAt: timestamppb.New(result.CreatedAt),
Description: result.Description,
})
}
return &pb.ListBackupsResponse{
Backups: backupList,
}, nil
}

func (sv *RuntimeServiceServer) RunJob(ctx context.Context, req *pb.RunJobRequest) (*pb.RunJobResponse, error) {
// create job run in db
projSpec, err := sv.projectRepoFactory.New().GetByName(req.ProjectName)
Expand Down
Loading

0 comments on commit 4622182

Please sign in to comment.