Skip to content

Commit

Permalink
Make the RunPostProcessor more robust
Browse files Browse the repository at this point in the history
This change makes the RunPostProcessor a member of the QueueManager
instead of the InflightMonitor. This is because the InflightMonitor can
be removed from the RunContext before calling Close(). This then causes
the run to then not invoke the RunPostProcessor, which in turn makes
runs to be in this state to be permanently stuck in the 'new' state.
  • Loading branch information
lhchavez committed Jul 31, 2020
1 parent 070450f commit 30db428
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cmd/omegaup-grader/frontend_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func registerFrontendHandlers(ctx *grader.Context, mux *http.ServeMux, db *sql.D
client := &http.Client{Transport: transport}

finishedRunsChan := make(chan *grader.RunInfo, 1)
ctx.InflightMonitor.PostProcessor.AddListener(finishedRunsChan)
ctx.QueueManager.PostProcessor.AddListener(finishedRunsChan)
go runPostProcessor(ctx, db, finishedRunsChan, client)

mux.Handle("/metrics", promhttp.Handler())
Expand Down
1 change: 1 addition & 0 deletions cmd/omegaup-grader/runner_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func processRun(
runCtx.Log.Error("Unable to create grade dir", "err", err, "runner", runnerName)
return &processRunStatus{http.StatusInternalServerError, false}
}
runCtx.RunInfo.Result.JudgedBy = runnerName

multipartReader, err := r.MultipartReader()
if err != nil {
Expand Down
21 changes: 9 additions & 12 deletions grader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,12 @@ func (runCtx *RunContext) Close() {
Type: QueueEventTypeManagerRemoved,
})

close(runCtx.runWaitHandle.ready)
runCtx.queueManager.PostProcessor.PostProcess(runCtx.RunInfo)

runCtx.Context.Close()
}()
var postProcessor *RunPostProcessor
if runCtx.monitor != nil {
postProcessor = runCtx.monitor.PostProcessor
runCtx.monitor.Remove(runCtx.RunInfo.Run.AttemptID)
}
if runCtx.inputRef != nil {
Expand Down Expand Up @@ -441,11 +442,6 @@ func (runCtx *RunContext) Close() {
return
}
}

close(runCtx.runWaitHandle.ready)
if postProcessor != nil {
postProcessor.PostProcess(runCtx.RunInfo)
}
}

// Requeue adds a RunContext back to the Queue from where it came from, if it
Expand Down Expand Up @@ -601,7 +597,6 @@ type InflightRun struct {
// a runner) and tracks their state in case the runner becomes unresponsive.
type InflightMonitor struct {
sync.Mutex
PostProcessor *RunPostProcessor
mapping map[uint64]*InflightRun
connectTimeout time.Duration
readyTimeout time.Duration
Expand All @@ -621,14 +616,11 @@ type RunData struct {

// NewInflightMonitor returns a new InflightMonitor.
func NewInflightMonitor() *InflightMonitor {
monitor := &InflightMonitor{
PostProcessor: NewRunPostProcessor(),
return &InflightMonitor{
mapping: make(map[uint64]*InflightRun),
connectTimeout: time.Duration(10) * time.Minute,
readyTimeout: time.Duration(10) * time.Minute,
}
go monitor.PostProcessor.run()
return monitor
}

// Add creates an InflightRun wrapper for the specified RunContext, adds it to
Expand Down Expand Up @@ -866,6 +858,8 @@ type queueEventListener struct {
// QueueManager is an expvar-friendly manager for Queues.
type QueueManager struct {
sync.Mutex
PostProcessor *RunPostProcessor

mapping map[string]*Queue
channelLength int
events chan *QueueEvent
Expand All @@ -881,6 +875,7 @@ type QueueInfo struct {
// NewQueueManager creates a new QueueManager.
func NewQueueManager(channelLength int, graderRuntimePath string) *QueueManager {
manager := &QueueManager{
PostProcessor: NewRunPostProcessor(),
mapping: make(map[string]*Queue),
channelLength: channelLength,
events: make(chan *QueueEvent, 1),
Expand All @@ -889,6 +884,7 @@ func NewQueueManager(channelLength int, graderRuntimePath string) *QueueManager
}
manager.Add(DefaultQueueName)
go manager.run()
go manager.PostProcessor.run()
return manager
}

Expand Down Expand Up @@ -966,6 +962,7 @@ func (manager *QueueManager) AddEvent(event *QueueEvent) {
// Close terminates the event listener goroutine.
func (manager *QueueManager) Close() {
close(manager.events)
manager.PostProcessor.Close()
}

func (manager *QueueManager) run() {
Expand Down
34 changes: 34 additions & 0 deletions grader/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,40 @@ func TestQueue(t *testing.T) {
}
}

func TestQueueRetry(t *testing.T) {
ctx, err := newGraderContext(t)
if err != nil {
t.Fatalf("GraderContext creation failed with %q", err)
}
if !ctx.Config.Runner.PreserveFiles {
defer os.RemoveAll(ctx.Config.Grader.RuntimePath)
}

queue, err := ctx.QueueManager.Get(DefaultQueueName)
if err != nil {
t.Fatalf("default queue not found")
}

listener := newListener()
ctx.QueueManager.PostProcessor.AddListener(listener.c)

closeNotifier := make(chan bool, 1)
addRun(t, ctx, queue, QueuePriorityNormal)
runCtx, _, _ := queue.GetRun("test", ctx.InflightMonitor, closeNotifier)
if !runCtx.Requeue(true) {
t.Fatalf("unable to retry run")
}
if runCtx.Requeue(true) {
t.Fatalf("run requeued even though it was marked as last attempt previously")
}
ctx.Close()

<-listener.done
if listener.processed != 1 {
t.Fatalf("listener.processed == %d, want %d", listener.processed, 1)
}
}

func TestQueuePriorities(t *testing.T) {
ctx, err := newGraderContext(t)
if err != nil {
Expand Down

0 comments on commit 30db428

Please sign in to comment.