Skip to content

Commit

Permalink
Make the run queue non-blocking from the frontend
Browse files Browse the repository at this point in the history
This change makes the interface from the frontend be non-blocking. Now
instead of injecting every single run one by one into the queue (which
causes massive backlogs upon rejudges), we immediately return an OK and
rely on MySQL polling for queue management.
  • Loading branch information
lhchavez committed Aug 7, 2020
1 parent 2b98a49 commit b9954b3
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 73 deletions.
216 changes: 144 additions & 72 deletions cmd/omegaup-grader/frontend_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,28 +235,132 @@ func runPostProcessor(
}
}

func getPendingRuns(ctx *grader.Context, db *sql.DB) ([]int64, error) {
rows, err := db.Query(
`SELECT
run_id
FROM
func runQueueLoop(
ctx *grader.Context,
runs *grader.Queue,
db *sql.DB,
newRuns <-chan struct{},
) {
ctx.Log.Info("Starting run queue loop")
_, err := db.Exec(
`
UPDATE
Runs
SET
status = 'new'
WHERE
status != 'ready';`)
status != 'ready';
`)
if err != nil {
return nil, err
ctx.Log.Error("Failed to reset pending runs", "err", err)
}
defer rows.Close()
var runIds []int64
for rows.Next() {
var runID int64
err = rows.Scan(&runID)
if err != nil {
return nil, err

var maxRunID int64
err = db.QueryRow(
`SELECT
MAX(r.run_id)
FROM
Runs r;
`).Scan(
&maxRunID,
)
if err != nil {
ctx.Log.Error("Failed to get the max run ID", "err", err)
}
ctx.Log.Debug("Max run ID found", "maxRunID", maxRunID)

for range newRuns {
ctx.Log.Debug("New run in the queue")
totalRunsInRound := 0
// Every time a new notification arrives, continuously get all the new runs
// from the database until there's nothing new.
hasNewRuns := true
for hasNewRuns {
hasNewRuns = false
rows, err := db.Query(
`
(
SELECT
run_id
FROM
Runs
WHERE
status = 'new'
AND
run_id > ?
ORDER BY
run_id
)
UNION
(
SELECT
run_id
FROM
Runs
WHERE
status = 'new'
AND
run_id <= ?
ORDER BY
run_id
)
LIMIT 128;
`,
maxRunID,
maxRunID,
)
if err != nil {
ctx.Log.Error("Failed to get new runs", "err", err)
break
}
for rows.Next() {
hasNewRuns = true
var runID int64
err = rows.Scan(&runID)
_, err := db.Exec(
`
UPDATE
Runs
SET
status = 'waiting'
WHERE
run_id = ?;
`,
runID)
if err != nil {
ctx.Log.Error("Failed to mark a run as waiting", "err", err)
continue
}
runInfo, err := newRunInfoFromID(ctx, db, runID)
if err != nil {
ctx.Log.Error(
"Error getting run information",
"err", err,
"runId", runID,
)
continue
}

priority := grader.QueuePriorityNormal
if maxRunID >= runID {
priority = grader.QueuePriorityLow
} else {
maxRunID = runID
}
if err := injectRuns(
ctx,
runs,
priority,
runInfo,
); err != nil {
ctx.Log.Error("Error injecting run", "runId", runID, "err", err)
}
totalRunsInRound++
}
rows.Close()
}
runIds = append(runIds, runID)
ctx.Log.Debug("Round finished", "runs processed", totalRunsInRound)
}
return runIds, nil
}

// gradeDir gets the new-style Run ID-based path.
Expand Down Expand Up @@ -437,41 +541,17 @@ func broadcast(
return nil
}

func registerFrontendHandlers(ctx *grader.Context, mux *http.ServeMux, db *sql.DB) {
func registerFrontendHandlers(
ctx *grader.Context,
mux *http.ServeMux,
newRuns chan struct{},
db *sql.DB,
) {
runs, err := ctx.QueueManager.Get(grader.DefaultQueueName)
if err != nil {
panic(err)
}
runIds, err := getPendingRuns(ctx, db)
if err != nil {
ctx.Log.Error("Failed to read pending runs", "err", err)
}
// Don't block while the runs are being injected. This prevents potential
// deadlocks where there are more runs than what the queue can hold, and the
// queue cannot be drained unless the transport is connected.
go func() {
ctx.Log.Info("Injecting pending runs", "count", len(runIds))
for _, runID := range runIds {
runInfo, err := newRunInfoFromID(ctx, db, runID)
if err != nil {
ctx.Log.Error(
"Error getting run information",
"err", err,
"runId", runID,
)
continue
}
if err := injectRuns(
ctx,
runs,
grader.QueuePriorityNormal,
runInfo,
); err != nil {
ctx.Log.Error("Error injecting run", "runId", runID, "err", err)
}
}
ctx.Log.Info("Injected pending runs", "count", len(runIds))
}()
go runQueueLoop(ctx, runs, db, newRuns)

transport := &http.Transport{
Dial: (&net.Dialer{
Expand Down Expand Up @@ -602,13 +682,19 @@ func registerFrontendHandlers(ctx *grader.Context, mux *http.ServeMux, db *sql.D
}
defer f.Close()

io.Copy(f, r.Body)

if err = injectRuns(ctx, runs, grader.QueuePriorityNormal, runInfo); err != nil {
ctx.Log.Info("/run/new/", "guid", runInfo.GUID, "response", "internal server error", "err", err)
if _, err := io.Copy(f, r.Body); err != nil {
ctx.Log.Info("/run/new/", "guid", runInfo.GUID, "response", "failed to copy submission", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

// Try to notify the channel that there's something new. If it has already
// been notified, do nothing.
select {
case newRuns <- struct{}{}:
default:
}

ctx.Log.Info("/run/new/", "guid", runInfo.GUID, "response", "ok")
w.WriteHeader(http.StatusOK)
})
Expand All @@ -624,28 +710,14 @@ func registerFrontendHandlers(ctx *grader.Context, mux *http.ServeMux, db *sql.D
return
}
ctx.Log.Info("/run/grade/", "request", request)
priority := grader.QueuePriorityNormal
if request.Rejudge || request.Debug {
priority = grader.QueuePriorityLow
}
var runInfos []*grader.RunInfo
for _, runID := range request.RunIDs {
runInfo, err := newRunInfoFromID(ctx, db, runID)
if err != nil {
ctx.Log.Error(
"Error getting run info",
"err", err,
"run id", runID,
)
w.WriteHeader(http.StatusInternalServerError)
return
}
runInfos = append(runInfos, runInfo)
}
if err = injectRuns(ctx, runs, priority, runInfos...); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return

// Try to notify the channel that there's something new. If it has already
// been notified, do nothing.
select {
case newRuns <- struct{}{}:
default:
}

w.Header().Set("Content-Type", "text/json; charset=utf-8")
fmt.Fprintf(w, "{\"status\":\"ok\"}")
})
Expand Down
8 changes: 7 additions & 1 deletion cmd/omegaup-grader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,14 @@ func main() {
graderContext().QueueManager.AddEventListener(queueEventsChan)
go queueEventsProcessor(queueEventsChan)

// A channel that signals that there are pending runs.
newRuns := make(chan struct{}, 1)
// Seed the channel with one token so that the queue loop can start injecting
// runs, even if there are no runs available.
newRuns <- struct{}{}
{
mux := http.DefaultServeMux
registerFrontendHandlers(graderContext(), mux, db)
registerFrontendHandlers(graderContext(), mux, newRuns, db)
shutdowners = append(
shutdowners,
common.RunServer(
Expand Down Expand Up @@ -297,6 +302,7 @@ func main() {

cancel()
wg.Wait()
close(newRuns)

ctx.Close()
ctx.Log.Info("Server gracefully stopped.")
Expand Down

0 comments on commit b9954b3

Please sign in to comment.