Skip to content

Commit

Permalink
feat: replay to support dry run (#91)
Browse files Browse the repository at this point in the history
* feat: update proto to support replay dry run

* feat: replay with dry run implementation on handler

* feat: get run status implementation on service

* test: add dry run create replay test on handler

* feat: inject run getter on replay service

* feat: implement get run status service

* feat: print replay dry run statuses on client

* feat: revert proto

* fix: linter

* revamp: enhance user exp

* feat: update proto

* revamp: change header name on create dry run

* feat: use dedicated request param for replay dryrun

* feat: update proto commit

* fix: trailing space

* refactor: rename resp on dryrun reply

* feat: update proton commit

* refactor: reuse getCron
  • Loading branch information
deryrahman authored Aug 1, 2023
1 parent 38eb9df commit ccfa271
Show file tree
Hide file tree
Showing 10 changed files with 531 additions and 131 deletions.
115 changes: 89 additions & 26 deletions client/cmd/replay/create.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package replay

import (
"bytes"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -36,6 +37,7 @@ type createCommand struct {
configFilePath string

parallel bool
dryRun bool
description string
jobConfig string

Expand Down Expand Up @@ -82,6 +84,7 @@ func (r *createCommand) injectFlags(cmd *cobra.Command) {
cmd.Flags().BoolVarP(&r.parallel, "parallel", "", false, "Backfill job runs in parallel")
cmd.Flags().StringVarP(&r.description, "description", "d", "", "Description of why backfill is needed")
cmd.Flags().StringVarP(&r.jobConfig, "job-config", "", "", "additional job configurations")
cmd.Flags().BoolVarP(&r.dryRun, "dry-run", "", false, "inspect replayed runs without taking effect on scheduler")

// Mandatory flags if config is not set
cmd.Flags().StringVarP(&r.projectName, "project-name", "p", "", "Name of the optimus project")
Expand Down Expand Up @@ -118,14 +121,91 @@ func (r *createCommand) RunE(_ *cobra.Command, args []string) error {
endTime = args[2]
}

replayID, err := r.createReplayRequest(jobName, startTime, endTime, r.jobConfig)
replayReq, err := r.createReplayRequest(jobName, startTime, endTime, r.jobConfig)
if err != nil {
return err
}

if r.dryRun {
replayDryRunReq := convertReplayToReplayDryRunRequest(replayReq)
err := r.replayDryRun(replayDryRunReq)
if err != nil {
return err
}
return nil
}

return r.replay(replayReq)
}

func convertReplayToReplayDryRunRequest(replayReq *pb.ReplayRequest) *pb.ReplayDryRunRequest {
return &pb.ReplayDryRunRequest{
ProjectName: replayReq.GetProjectName(),
JobName: replayReq.GetJobName(),
NamespaceName: replayReq.GetNamespaceName(),
StartTime: replayReq.GetStartTime(),
EndTime: replayReq.GetEndTime(),
Parallel: replayReq.GetParallel(),
Description: replayReq.GetDescription(),
JobConfig: replayReq.GetJobConfig(),
}
}

func (r *createCommand) replayDryRun(replayDryRunReq *pb.ReplayDryRunRequest) error {
conn, err := r.connection.Create(r.host)
if err != nil {
return err
}
defer conn.Close()

replayService := pb.NewReplayServiceClient(conn)

ctx, cancelFunc := context.WithTimeout(context.Background(), replayTimeout)
defer cancelFunc()

resp, err := replayService.ReplayDryRun(ctx, replayDryRunReq)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.logger.Error("Replay dry-run took too long, timing out")
}
return fmt.Errorf("replay dry-run request failed: %w", err)
}

runs := resp.GetReplayRuns()

buff := &bytes.Buffer{}
header := []string{"scheduled at", "status"}
stringifyReplayRuns(buff, header, runs)

r.logger.Info("List of runs to be replayed:")
r.logger.Info(buff.String())
return nil
}

func (r *createCommand) replay(replayReq *pb.ReplayRequest) error {
conn, err := r.connection.Create(r.host)
if err != nil {
return err
}
defer conn.Close()

replayService := pb.NewReplayServiceClient(conn)

ctx, cancelFunc := context.WithTimeout(context.Background(), replayTimeout)
defer cancelFunc()

resp, err := replayService.Replay(ctx, replayReq)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.logger.Error("Replay creation took too long, timing out")
}
return fmt.Errorf("replay request failed: %w", err)
}

r.logger.Info("Replay request is accepted and it is in progress")
r.logger.Info("Either you could wait or you could close (ctrl+c) and check the status with `optimus replay status %s` command later", replayID)
r.logger.Info("Either you could wait or you could close (ctrl+c) and check the status with `optimus replay status %s` command later", resp.Id)

return r.waitForReplayState(replayID)
return r.waitForReplayState(resp.Id)
}

func (r *createCommand) waitForReplayState(replayID string) error {
Expand Down Expand Up @@ -157,28 +237,16 @@ func (r *createCommand) getReplay(replayID string) (*pb.GetReplayResponse, error
return getReplay(r.host, replayID, r.connection)
}

func (r *createCommand) createReplayRequest(jobName, startTimeStr, endTimeStr, jobConfig string) (string, error) {
conn, err := r.connection.Create(r.host)
if err != nil {
return "", err
}
defer conn.Close()

replayService := pb.NewReplayServiceClient(conn)

func (r *createCommand) createReplayRequest(jobName, startTimeStr, endTimeStr, jobConfig string) (*pb.ReplayRequest, error) {
startTime, err := getTimeProto(startTimeStr)
if err != nil {
return "", err
return nil, err
}
endTime, err := getTimeProto(endTimeStr)
if err != nil {
return "", err
return nil, err
}

ctx, cancelFunc := context.WithTimeout(context.Background(), replayTimeout)
defer cancelFunc()

respStream, err := replayService.Replay(ctx, &pb.ReplayRequest{
replayReq := &pb.ReplayRequest{
ProjectName: r.projectName,
JobName: jobName,
NamespaceName: r.namespaceName,
Expand All @@ -187,14 +255,9 @@ func (r *createCommand) createReplayRequest(jobName, startTimeStr, endTimeStr, j
Parallel: r.parallel,
Description: r.description,
JobConfig: jobConfig,
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.logger.Error("Replay creation took too long, timing out")
}
return "", fmt.Errorf("replay request failed: %w", err)
}
return respStream.Id, nil

return replayReq, nil
}

func getTimeProto(timeStr string) (*timestamppb.Timestamp, error) {
Expand Down
15 changes: 5 additions & 10 deletions client/cmd/replay/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func stringifyReplayStatus(resp *pb.GetReplayResponse) string {
}

if len(resp.GetReplayRuns()) > 0 {
stringifyReplayRuns(buff, resp.GetReplayRuns())
header := []string{"scheduled at", "current status"}
stringifyReplayRuns(buff, header, resp.GetReplayRuns())
}

return buff.String()
Expand All @@ -145,24 +146,18 @@ func stringifyReplayStatus(resp *pb.GetReplayResponse) string {
func stringifyReplayConfig(buff *bytes.Buffer, jobConfig map[string]string) {
table := tablewriter.NewWriter(buff)
table.SetBorder(false)
table.SetHeader([]string{
"config key",
"config value",
})
table.SetHeader([]string{"config key", "config value"})
table.SetAlignment(tablewriter.ALIGN_LEFT)
for k, v := range jobConfig {
table.Append([]string{k, v})
}
table.Render()
}

func stringifyReplayRuns(buff *bytes.Buffer, runs []*pb.ReplayRun) {
func stringifyReplayRuns(buff *bytes.Buffer, header []string, runs []*pb.ReplayRun) {
table := tablewriter.NewWriter(buff)
table.SetBorder(false)
table.SetHeader([]string{
"scheduled at",
"status",
})
table.SetHeader(header)
table.SetAlignment(tablewriter.ALIGN_LEFT)
for _, run := range runs {
table.Append([]string{
Expand Down
113 changes: 79 additions & 34 deletions core/scheduler/handler/v1beta1/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ type ReplayService interface {
CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error)
GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error)
GetReplayByID(ctx context.Context, replayID uuid.UUID) (replay *scheduler.ReplayWithRun, err error)
GetRunsStatus(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (runs []*scheduler.JobRunStatus, err error)
}

type replayRequest interface {
GetProjectName() string
GetNamespaceName() string
GetJobName() string
GetStartTime() *timestamppb.Timestamp
GetEndTime() *timestamppb.Timestamp
GetJobConfig() string
GetParallel() bool
GetDescription() string
}

type ReplayHandler struct {
Expand All @@ -27,44 +39,32 @@ type ReplayHandler struct {
pb.UnimplementedReplayServiceServer
}

func (h ReplayHandler) Replay(ctx context.Context, req *pb.ReplayRequest) (*pb.ReplayResponse, error) {
replayTenant, err := tenant.NewTenant(req.GetProjectName(), req.NamespaceName)
func (h ReplayHandler) ReplayDryRun(ctx context.Context, req *pb.ReplayDryRunRequest) (*pb.ReplayDryRunResponse, error) {
replayReq, err := newReplayRequest(h.l, req)
if err != nil {
h.l.Error("invalid tenant information request project [%s] namespace [%s]: %s", req.GetProjectName(), req.GetNamespaceName(), err)
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
return nil, err
}

jobName, err := scheduler.JobNameFrom(req.GetJobName())
runs, err := h.service.GetRunsStatus(ctx, replayReq.Tenant(), replayReq.JobName(), replayReq.Config())
if err != nil {
h.l.Error("error adapting job name [%s]: %s", req.GetJobName(), err)
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

if err = req.GetStartTime().CheckValid(); err != nil {
h.l.Error("error validating start time: %s", err)
return nil, errors.GRPCErr(errors.InvalidArgument(scheduler.EntityJobRun, "invalid start_time"), "unable to start replay for "+req.GetJobName())
h.l.Error("error fetching runs status for replay dry run: %s", err)
return nil, errors.GRPCErr(err, "unable to fetch runs status for "+req.JobName)
}

if req.GetEndTime() != nil {
if err = req.GetEndTime().CheckValid(); err != nil {
h.l.Error("error validating end time: %s", err)
return nil, errors.GRPCErr(errors.InvalidArgument(scheduler.EntityJobRun, "invalid end_time"), "unable to start replay for "+req.GetJobName())
}
}
return &pb.ReplayDryRunResponse{
ReplayRuns: replayRunsToProto(runs),
}, nil
}

jobConfig := make(map[string]string)
if req.JobConfig != "" {
jobConfig, err = parseJobConfig(req.JobConfig)
if err != nil {
h.l.Error("error parsing job config: %s", err)
return nil, errors.GRPCErr(err, "unable to parse replay job config for "+req.JobName)
}
func (h ReplayHandler) Replay(ctx context.Context, req *pb.ReplayRequest) (*pb.ReplayResponse, error) {
replayReq, err := newReplayRequest(h.l, req)
if err != nil {
return nil, err
}

replayConfig := scheduler.NewReplayConfig(req.GetStartTime().AsTime(), req.GetEndTime().AsTime(), req.Parallel, jobConfig, req.Description)
replayID, err := h.service.CreateReplay(ctx, replayTenant, jobName, replayConfig)
replayID, err := h.service.CreateReplay(ctx, replayReq.Tenant(), replayReq.JobName(), replayReq.Config())
if err != nil {
h.l.Error("error creating replay for job [%s]: %s", jobName, err)
h.l.Error("error creating replay for job [%s]: %s", req.GetJobName(), err)
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

Expand Down Expand Up @@ -110,18 +110,63 @@ func (h ReplayHandler) GetReplay(ctx context.Context, req *pb.GetReplayRequest)
return nil, errors.GRPCErr(err, "unable to get replay for replayID "+req.GetReplayId())
}

runs := make([]*pb.ReplayRun, len(replay.Runs))
for i, run := range replay.Runs {
runs[i] = &pb.ReplayRun{
replayProto := replayToProto(replay.Replay)
replayProto.ReplayRuns = replayRunsToProto(replay.Runs)

return replayProto, nil
}

func replayRunsToProto(runs []*scheduler.JobRunStatus) []*pb.ReplayRun {
runsProto := make([]*pb.ReplayRun, len(runs))
for i, run := range runs {
runsProto[i] = &pb.ReplayRun{
ScheduledAt: timestamppb.New(run.ScheduledAt),
Status: run.State.String(),
}
}
return runsProto
}

replayProto := replayToProto(replay.Replay)
replayProto.ReplayRuns = runs
func newReplayRequest(l log.Logger, req replayRequest) (*scheduler.Replay, error) {
replayTenant, err := tenant.NewTenant(req.GetProjectName(), req.GetNamespaceName())
if err != nil {
l.Error("invalid tenant information request project [%s] namespace [%s]: %s", req.GetProjectName(), req.GetNamespaceName(), err)
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

return replayProto, nil
jobName, err := scheduler.JobNameFrom(req.GetJobName())
if err != nil {
l.Error("error adapting job name [%s]: %s", req.GetJobName(), err)
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

if err = req.GetStartTime().CheckValid(); err != nil {
l.Error("error validating start time: %s", err)
return nil, errors.GRPCErr(errors.InvalidArgument(scheduler.EntityJobRun, "invalid start_time"), "unable to start replay for "+req.GetJobName())
}

if req.GetEndTime() != nil {
if err = req.GetEndTime().CheckValid(); err != nil {
l.Error("error validating end time: %s", err)
return nil, errors.GRPCErr(errors.InvalidArgument(scheduler.EntityJobRun, "invalid end_time"), "unable to start end for "+req.GetJobName())
}
}

jobConfig := make(map[string]string)
if req.GetJobConfig() != "" {
jobConfig, err = parseJobConfig(req.GetJobConfig())
if err != nil {
return nil, err
}
}

replayConfig := scheduler.NewReplayConfig(req.GetStartTime().AsTime(), req.GetEndTime().AsTime(), req.GetParallel(), jobConfig, req.GetDescription())
if err != nil {
l.Error("error parsing job config: %s", err)
return nil, errors.GRPCErr(err, "unable to parse replay job config for "+req.GetJobName())
}

return scheduler.NewReplayRequest(jobName, replayTenant, replayConfig, scheduler.ReplayStateCreated), nil
}

func replayToProto(replay *scheduler.Replay) *pb.GetReplayResponse {
Expand Down
Loading

0 comments on commit ccfa271

Please sign in to comment.