Skip to content

Commit

Permalink
fix(ray): fix upscale deployment failure (#698)
Browse files Browse the repository at this point in the history
Because

- occasionally upscale will fail due to state check

This commit

- fix upscale deployment failure
  • Loading branch information
heiruwu authored Oct 16, 2024
1 parent b118026 commit 96563c9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pkg/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestWorker_TriggerModelActivity(t *testing.T) {
require.NoError(t, err)
})

t.Run("when model is offline", func(t *testing.T) {
t.Run("when model is error", func(t *testing.T) {
param := &worker.TriggerModelActivityRequest{}
param.UserUID, _ = uuid.NewV4()
param.OwnerUID, _ = uuid.NewV4()
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestWorker_TriggerModelActivity(t *testing.T) {
mockRay := mock.NewRayMock(mc)
ctx := context.Background()

mockRay.ModelReadyMock.Times(1).Return(modelpb.State_STATE_OFFLINE.Enum(), "", 1, nil)
mockRay.ModelReadyMock.Return(modelpb.State_STATE_ERROR.Enum().Enum(), "", 0, nil)

w := worker.NewWorker(rc, mockRay, repo, nil, nil)
err = w.TriggerModelActivity(ctx, param)
Expand Down
26 changes: 20 additions & 6 deletions pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,27 @@ func (w *worker) TriggerModelActivity(ctx context.Context, param *TriggerModelAc
// wait for model instance to come online to start processing the request
// temporary solution to not overcharge for credits
// TODO: design a better flow
for {
if state, _, numOfActiveReplica, err := w.ray.ModelReady(ctx, fmt.Sprintf("%s/%s/%s", param.OwnerType, param.OwnerUID, param.ModelID), param.ModelVersion.Version); err != nil {
state, _, numOfActiveReplica, err := w.ray.ModelReady(ctx, fmt.Sprintf("%s/%s/%s", param.OwnerType, param.OwnerUID, param.ModelID), param.ModelVersion.Version)
if err != nil {
return w.toApplicationError(err, param.ModelID, ModelActivityError)
}
for *state == modelpb.State_STATE_OFFLINE {
time.Sleep(time.Millisecond * 500)
state, _, numOfActiveReplica, err = w.ray.ModelReady(ctx, fmt.Sprintf("%s/%s/%s", param.OwnerType, param.OwnerUID, param.ModelID), param.ModelVersion.Version)
if err != nil {
return w.toApplicationError(err, param.ModelID, ModelActivityError)
}
}
for *state != modelpb.State_STATE_ACTIVE || numOfActiveReplica <= 0 {
logger.Debug(fmt.Sprintf("model upscale state: %v", state))
logger.Debug(fmt.Sprintf("model upscale numOfActiveReplica: %v", numOfActiveReplica))
if state, _, numOfActiveReplica, err = w.ray.ModelReady(ctx, fmt.Sprintf("%s/%s/%s", param.OwnerType, param.OwnerUID, param.ModelID), param.ModelVersion.Version); err != nil {
return w.toApplicationError(err, param.ModelID, ModelActivityError)
} else if *state == modelpb.State_STATE_ACTIVE && numOfActiveReplica > 0 {
break
} else if *state != modelpb.State_STATE_SCALING_UP && *state != modelpb.State_STATE_STARTING {
return w.toApplicationError(fmt.Errorf("model upscale failed"), param.ModelID, ModelActivityError)
} else if *state != modelpb.State_STATE_SCALING_UP && *state != modelpb.State_STATE_STARTING && *state != modelpb.State_STATE_ACTIVE {
logger.Error(fmt.Sprintf("model upscale failed: current model state: %v", state), zap.Error(err))
return w.toApplicationError(fmt.Errorf("model upscale failed: current model state: %v", state), param.ModelID, ModelActivityError)
} else {
time.Sleep(time.Millisecond * 500)
}
}

Expand Down

0 comments on commit 96563c9

Please sign in to comment.