Skip to content

Commit

Permalink
Handle replay across Optimus server restarts (#279)
Browse files Browse the repository at this point in the history
* fix: watch replay across restarts

* fix: add test
  • Loading branch information
Mryashbhardwaj authored and deryrahman committed Oct 22, 2024
1 parent ac51512 commit 29403d9
Show file tree
Hide file tree
Showing 10 changed files with 437 additions and 108 deletions.
5 changes: 3 additions & 2 deletions core/scheduler/handler/v1beta1/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestReplayHandler(t *testing.T) {
jobTenant, _ := tenant.NewTenant(projectName, namespaceName)
jobName, _ := scheduler.JobNameFrom("a-job-name")
startTime := timestamppb.New(time.Date(2023, 0o1, 0o1, 13, 0, 0, 0, time.UTC))
updateTime := time.Now()
endTime := timestamppb.New(time.Date(2023, 0o1, 0o2, 13, 0, 0, 0, time.UTC))
jobConfigStr := "EXECUTION_PROJECT=example_project,ANOTHER_CONFIG=example_value"
jobConfig := map[string]string{"EXECUTION_PROJECT": "example_project", "ANOTHER_CONFIG": "example_value"}
Expand Down Expand Up @@ -456,7 +457,7 @@ func TestReplayHandler(t *testing.T) {
startTime, _ := time.Parse(scheduler.ISODateFormat, startTimeStr)
endTime := startTime.Add(48 * time.Hour)
replayConfig := scheduler.NewReplayConfig(startTime, endTime, true, map[string]string{}, description)
replay := scheduler.NewReplay(replayID, "sample-job-A", tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, message)
replay := scheduler.NewReplay(replayID, "sample-job-A", tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, updateTime, message)
service.On("GetReplayByID", ctx, replayID).Return(&scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
Expand Down Expand Up @@ -535,7 +536,7 @@ func TestReplayHandler(t *testing.T) {
startTime, _ := time.Parse(scheduler.ISODateFormat, startTimeStr)
endTime := startTime.Add(48 * time.Hour)
replayConfig := scheduler.NewReplayConfig(startTime, endTime, true, map[string]string{}, description)
replay := scheduler.NewReplay(replayID, "sample-job-A", tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, message)
replay := scheduler.NewReplay(replayID, "sample-job-A", tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, updateTime, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
Expand Down
9 changes: 7 additions & 2 deletions core/scheduler/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Replay struct {
message string

createdAt time.Time
updatedAt time.Time
}

func (r *Replay) ID() uuid.UUID {
Expand All @@ -91,6 +92,10 @@ func (r *Replay) Tenant() tenant.Tenant {
return r.tenant
}

func (r *Replay) UpdatedAt() time.Time {
return r.updatedAt
}

func (r *Replay) Config() *ReplayConfig {
return r.config
}
Expand Down Expand Up @@ -120,8 +125,8 @@ func NewReplayRequest(jobName JobName, tenant tenant.Tenant, config *ReplayConfi
return &Replay{jobName: jobName, tenant: tenant, config: config, state: state}
}

func NewReplay(id uuid.UUID, jobName JobName, tenant tenant.Tenant, config *ReplayConfig, state ReplayState, createdAt time.Time, message string) *Replay {
return &Replay{id: id, jobName: jobName, tenant: tenant, config: config, state: state, createdAt: createdAt, message: message}
func NewReplay(id uuid.UUID, jobName JobName, tenant tenant.Tenant, config *ReplayConfig, state ReplayState, createdAt, updatedAt time.Time, message string) *Replay {
return &Replay{id: id, jobName: jobName, tenant: tenant, config: config, state: state, createdAt: createdAt, updatedAt: updatedAt, message: message}
}

type ReplayWithRun struct {
Expand Down
17 changes: 9 additions & 8 deletions core/scheduler/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ func TestReplay(t *testing.T) {
scheduledTime3 := scheduledTime1.Add(2 * 24 * time.Hour)
scheduledTime4 := scheduledTime1.Add(3 * 24 * time.Hour)
message := "sample message"
updatedTime := time.Now()

t.Run("NewReplay", func(t *testing.T) {
createdTime := time.Now()
replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCreated, createdTime, message)
replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCreated, createdTime, updatedTime, message)

assert.Equal(t, replayID, replay.ID())
assert.Equal(t, jobNameA, replay.JobName())
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestReplay(t *testing.T) {
}

t.Run("GetFirstExecutableRun", func(t *testing.T) {
replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCreated, time.Now(), message)
replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCreated, time.Now(), updatedTime, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
Expand All @@ -87,7 +88,7 @@ func TestReplay(t *testing.T) {
assert.Equal(t, firstExecutableRun, secondRun)
})
t.Run("GetLastExecutableRun", func(t *testing.T) {
replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCreated, time.Now(), message)
replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCreated, time.Now(), updatedTime, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
Expand Down Expand Up @@ -130,26 +131,26 @@ func TestReplay(t *testing.T) {
t.Run("should return true if it is in termination state", func(t *testing.T) {
createdTime := time.Now()

replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCancelled, createdTime, message)
replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCancelled, createdTime, updatedTime, message)
result := replay.IsTerminated()
assert.True(t, result)

replay = scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateFailed, createdTime, message)
replay = scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateFailed, createdTime, updatedTime, message)
result = replay.IsTerminated()
assert.True(t, result)

replay = scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateSuccess, createdTime, message)
replay = scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateSuccess, createdTime, updatedTime, message)
result = replay.IsTerminated()
assert.True(t, result)
})
t.Run("should return false if replay is not in termination state", func(t *testing.T) {
createdTime := time.Now()

replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCreated, createdTime, message)
replay := scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateCreated, createdTime, updatedTime, message)
result := replay.IsTerminated()
assert.False(t, result)

replay = scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateInProgress, createdTime, message)
replay = scheduler.NewReplay(replayID, jobNameA, tnnt, replayConfig, scheduler.ReplayStateInProgress, createdTime, updatedTime, message)
result = replay.IsTerminated()
assert.False(t, result)
})
Expand Down
10 changes: 8 additions & 2 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/goto/salt/log"
Expand Down Expand Up @@ -36,7 +37,11 @@ type ReplayRepository interface {
UpdateReplay(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, runs []*scheduler.JobRunStatus, message string) error
UpdateReplayRuns(ctx context.Context, replayID uuid.UUID, runs []*scheduler.JobRunStatus) error
UpdateReplayStatus(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, message string) error
CancelReplayRequest(ctx context.Context, replayID uuid.UUID, message string) error
ScanAbandonedReplayRequests(ctx context.Context, unhandledClassifierDuration time.Duration) ([]*scheduler.Replay, error)
AcquireReplayRequest(ctx context.Context, replayID uuid.UUID, unhandledClassifierDuration time.Duration) error

GetReplayRequestByID(ctx context.Context, replayID uuid.UUID) (*scheduler.Replay, error)
GetReplayByFilters(ctx context.Context, projectName tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error)
GetReplayRequestsByStatus(ctx context.Context, statusList []scheduler.ReplayState) ([]*scheduler.Replay, error)
GetReplaysByProject(ctx context.Context, projectName tenant.ProjectName, dayLimits int) ([]*scheduler.Replay, error)
Expand All @@ -52,7 +57,7 @@ type ReplayValidator interface {
}

type ReplayExecutor interface {
Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)
Execute(ctx context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)
SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error)
CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus
}
Expand Down Expand Up @@ -117,7 +122,7 @@ func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobNa
State: scheduler.ReplayStateCreated,
})

go r.executor.Execute(replayID, replayReq.Tenant(), jobName)
go r.executor.Execute(ctx, replayID, replayReq.Tenant(), jobName)

return replayID, nil
}
Expand Down Expand Up @@ -235,6 +240,7 @@ func (r *ReplayService) cancelReplayRuns(ctx context.Context, replayWithRun *sch
r.logger.Error("unable to sync replay runs status for job [%s]: %s", jobName.String(), err.Error())
return err
}
r.logger.Debug(fmt.Sprintf("Synced Run status from Airflow : %#v", syncedRunStatus))

statesForCanceling := []scheduler.State{scheduler.StateRunning, scheduler.StateInProgress, scheduler.StateQueued}
toBeCanceledRuns := syncedRunStatus.GetSortedRunsByStates(statesForCanceling)
Expand Down
37 changes: 29 additions & 8 deletions core/scheduler/service/replay_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

func TestReplayService(t *testing.T) {
ctx := context.Background()
now := time.Now()
projName := tenant.ProjectName("proj")
namespaceName := tenant.NamespaceName("ns1")
jobName := scheduler.JobName("sample_select")
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestReplayService(t *testing.T) {
jobRepository.On("GetJob", ctx, projName, jobName).Return(&job, nil)
replayValidator.On("Validate", ctx, replayReq, jobCron).Return(nil)
replayRepository.On("RegisterReplay", ctx, replayReq, replayRuns).Return(replayID, nil)
replayWorker.On("Execute", replayID, tnnt, jobName).Return().Maybe()
replayWorker.On("Execute", ctx, replayID, tnnt, jobName).Return().Maybe()

alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
Expand Down Expand Up @@ -148,7 +149,7 @@ func TestReplayService(t *testing.T) {
jobRepository.On("GetJob", ctx, projName, jobName).Return(&job, nil)
replayValidator.On("Validate", ctx, replayReq, jobCron).Return(nil)
replayRepository.On("RegisterReplay", ctx, replayReq, replayRuns).Return(replayID, nil)
replayWorker.On("Execute", replayID, tnnt, jobName).Return().Maybe()
replayWorker.On("Execute", ctx, replayID, tnnt, jobName).Return().Maybe()

alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
Expand Down Expand Up @@ -348,7 +349,7 @@ func TestReplayService(t *testing.T) {
defer replayRepository.AssertExpectations(t)

replayID := uuid.New()
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, message)
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, now, message)
replayRepository.On("GetReplayByID", ctx, replayID).Return(&scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
Expand All @@ -372,7 +373,7 @@ func TestReplayService(t *testing.T) {
replayRepository := new(ReplayRepository)
defer replayRepository.AssertExpectations(t)

replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateSuccess, startTime, message)
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateSuccess, startTime, now, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
Expand All @@ -391,7 +392,7 @@ func TestReplayService(t *testing.T) {
replayRepository := new(ReplayRepository)
defer replayRepository.AssertExpectations(t)

replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, message)
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, now, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
Expand All @@ -410,7 +411,7 @@ func TestReplayService(t *testing.T) {
assert.ErrorContains(t, err, errorMsg)
})
t.Run("returns no error if replay has been successfully cancelled", func(t *testing.T) {
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, message)
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, now, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
Expand Down Expand Up @@ -649,6 +650,16 @@ func (_m *ReplayRepository) UpdateReplayStatus(ctx context.Context, replayID uui
return r0
}

func (_m *ReplayRepository) ScanAbandonedReplayRequests(ctx context.Context, unhandledClassifierDuration time.Duration) ([]*scheduler.Replay, error) {
args := _m.Called(ctx, unhandledClassifierDuration)
return args.Get(0).([]*scheduler.Replay), args.Error(1)
}

func (_m *ReplayRepository) AcquireReplayRequest(ctx context.Context, replayID uuid.UUID, unhandledClassifierDuration time.Duration) error {
args := _m.Called(ctx, replayID, unhandledClassifierDuration)
return args.Error(0)
}

// GetReplayJobConfig provides a mock function with given fields: ctx, jobTenant, jobName, scheduledAt
func (_m *ReplayRepository) GetReplayJobConfig(ctx context.Context, jobTenant tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) (map[string]string, error) {
ret := _m.Called(ctx, jobTenant, jobName, scheduledAt)
Expand All @@ -675,6 +686,16 @@ func (_m *ReplayRepository) GetReplayJobConfig(ctx context.Context, jobTenant te
return r0, r1
}

func (_m *ReplayRepository) GetReplayRequestByID(ctx context.Context, replayID uuid.UUID) (*scheduler.Replay, error) {
args := _m.Called(ctx, replayID)
return args.Get(0).(*scheduler.Replay), args.Error(1)
}

func (_m *ReplayRepository) CancelReplayRequest(ctx context.Context, replayID uuid.UUID, message string) error {
args := _m.Called(ctx, replayID, message)
return args.Error(0)
}

// GetReplayByFilters provides a mock function with given fields: ctx, jobTenant, projectName, filters
func (_m *ReplayRepository) GetReplayByFilters(ctx context.Context, projectName tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error) {
ret := _m.Called(ctx, projectName, filters)
Expand Down Expand Up @@ -722,8 +743,8 @@ type ReplayExecutor struct {
}

// Execute provides a mock function with given fields: ctx, replayRequest
func (_m *ReplayExecutor) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) {
_m.Called(replayID, jobTenant, jobName)
func (_m *ReplayExecutor) Execute(ctx context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) {
_m.Called(ctx, replayID, jobTenant, jobName)
}

func (_m *ReplayExecutor) SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
Expand Down
Loading

0 comments on commit 29403d9

Please sign in to comment.