diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..91ec7b1 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Server", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "application/server/main.go" + } + ] +} \ No newline at end of file diff --git a/Makefile b/Makefile index 99d75ba..ac3f17b 100644 --- a/Makefile +++ b/Makefile @@ -72,11 +72,11 @@ run-rpc-server: build check .PHONY: run-background run-background: build check - make -j run-task-worker run-schedule + make -j run-schedule .PHONY: run-all run-all: build check - make -j run-task-worker run-server run-schedule + make -j run-server run-schedule .PHONY: help help: diff --git a/application/schedule/main.go b/application/schedule/main.go index 52414ce..7905807 100644 --- a/application/schedule/main.go +++ b/application/schedule/main.go @@ -1,15 +1,5 @@ package main -import ( - asynqAgent "github.com/OJ-lab/oj-lab-services/core/agent/asynq" - "github.com/OJ-lab/oj-lab-services/service/business" -) - func main() { - asynqAgent.RunSecheduler( - asynqAgent.ScheduleTask{ - Cronspec: "@every 1s", - Task: business.NewTaskJudgerTrackAllState(), - }, - ) + } diff --git a/application/server/handler/judge.go b/application/server/handler/judge.go index fe26220..ba409b6 100644 --- a/application/server/handler/judge.go +++ b/application/server/handler/judge.go @@ -10,6 +10,8 @@ func SetupJudgeRoute(baseRoute *gin.RouterGroup) { g := baseRoute.Group("/judge") { g.POST("/add-judger", postJudger) + g.POST("/task/pick", postPickJudgeTask) + g.POST("/task/report", postReportJudgeTaskResult) } } @@ -29,3 +31,48 @@ func postJudger(ginCtx *gin.Context) { "message": "success", }) } + +type PickJudgeTaskBody struct { + Consumer string `json:"consumer"` +} + +func postPickJudgeTask(ginCtx *gin.Context) { + body := PickJudgeTaskBody{} + if err := ginCtx.ShouldBindJSON(&body); err != nil { + ginCtx.Error(err) + return + } + + task, err := service.PickJudgeTask(ginCtx, body.Consumer) + if err != nil { + ginCtx.Error(err) + return + } + + ginCtx.JSON(200, gin.H{ + "task": task, + }) +} + +type ReportJudgeTaskResultBody struct { + Consumer string `json:"consumer"` + StreamID string `json:"stream_id"` + VerdictJson string `json:"verdict_json"` +} + +func postReportJudgeTaskResult(ginCtx *gin.Context) { + body := ReportJudgeTaskResultBody{} + if err := ginCtx.ShouldBindJSON(&body); err != nil { + ginCtx.Error(err) + return + } + + if err := service.ReportJudgeTaskResult(ginCtx, body.Consumer, body.StreamID, body.VerdictJson); err != nil { + ginCtx.Error(err) + return + } + + ginCtx.JSON(200, gin.H{ + "message": "success", + }) +} diff --git a/application/server/handler/problem.go b/application/server/handler/problem.go index 27bba74..f341137 100644 --- a/application/server/handler/problem.go +++ b/application/server/handler/problem.go @@ -100,11 +100,12 @@ func postSubmission(ginCtx *gin.Context) { return } - submission, err := service.PostSubmission(ginCtx, slug, body.Code, body.Language) - if err != nil { - ginCtx.Error(err) + submission := model.NewSubmission("", slug, body.Code, body.Language) + result, svcErr := service.CreateJudgeTaskSubmission(ginCtx, submission) + if svcErr != nil { + svcErr.AppendToGin(ginCtx) return } - ginCtx.JSON(200, submission) + ginCtx.JSON(200, result) } diff --git a/application/server/main.go b/application/server/main.go index 6e0bf9e..a3a1dad 100644 --- a/application/server/main.go +++ b/application/server/main.go @@ -61,6 +61,7 @@ func main() { handler.SetupProblemRoute(apiRouter) handler.SetupEventRouter(apiRouter) handler.SetupSubmissionRouter(apiRouter) + handler.SetupJudgeRoute(apiRouter) err := r.Run(servicePort) if err != nil { diff --git a/go.mod b/go.mod index 0f79b00..a573a53 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/google/uuid v1.3.1 + github.com/redis/go-redis/v9 v9.1.0 github.com/sirupsen/logrus v1.9.3 github.com/spf13/viper v1.16.0 github.com/swaggo/swag v1.16.2 @@ -78,7 +79,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.9 // indirect - github.com/redis/go-redis/v9 v9.1.0 github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.0 github.com/ugorji/go/codec v1.2.11 // indirect diff --git a/service/business/judge_stream.go b/service/business/judge_stream.go new file mode 100644 index 0000000..53a7888 --- /dev/null +++ b/service/business/judge_stream.go @@ -0,0 +1,78 @@ +package business + +import ( + "context" + + redisAgent "github.com/OJ-lab/oj-lab-services/core/agent/redis" + "github.com/OJ-lab/oj-lab-services/service/model" + "github.com/redis/go-redis/v9" +) + +const ( + streamName = "oj_lab_judge_stream" + consumerGroupName = "oj_lab_judge_stream_consumer_group" + defaultConsumerName = "oj_lab_judge_stream_consumer_default" +) + +func init() { + redisAgent := redisAgent.GetDefaultRedisClient() + _, err := redisAgent.XGroupCreateMkStream(context.Background(), streamName, consumerGroupName, "0").Result() + if err != nil && err != redis.Nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { + panic(err) + } +} + +func AddTaskToStream(ctx context.Context, task *model.JudgeTask) (*string, error) { + redisAgent := redisAgent.GetDefaultRedisClient() + id, err := redisAgent.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + Values: task.ToStringMap(), + }).Result() + if err != nil { + return nil, err + } + + return &id, err +} + +func GetTaskFromStream(ctx context.Context, consumer string) (*model.JudgeTask, error) { + redisAgent := redisAgent.GetDefaultRedisClient() + if consumer == "" { + consumer = defaultConsumerName + } + result, err := redisAgent.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: consumerGroupName, + Consumer: consumer, + Streams: []string{streamName, ">"}, + Count: 1, + Block: -1, + }).Result() + + if err != nil { + return nil, err + } + if len(result) == 0 { + return nil, nil + } + + task := model.JudgeTask{} + for _, message := range result[0].Messages { + task = *model.JudgeTaskFromMap(message.Values) + task.RedisStreamID = &message.ID + } + + return &task, nil +} + +func AckTaskFromStream(ctx context.Context, consumer string, streamID string) error { + redisAgent := redisAgent.GetDefaultRedisClient() + if consumer == "" { + consumer = defaultConsumerName + } + _, err := redisAgent.XAck(ctx, streamName, consumerGroupName, streamID).Result() + if err != nil { + return err + } + + return nil +} diff --git a/service/business/judger_task.go b/service/business/judger_task.go index 92fdba9..06b1502 100644 --- a/service/business/judger_task.go +++ b/service/business/judger_task.go @@ -1,3 +1,5 @@ +// ! Deprected + package business import ( diff --git a/service/mapper/submission.go b/service/mapper/submission.go index 33a2ed4..cb7ad88 100644 --- a/service/mapper/submission.go +++ b/service/mapper/submission.go @@ -1,6 +1,8 @@ package mapper import ( + "fmt" + "github.com/OJ-lab/oj-lab-services/service/model" "github.com/google/uuid" "gorm.io/gorm" @@ -77,3 +79,32 @@ func GetSubmissionListByOptions(tx *gorm.DB, options GetSubmissionOptions) ([]*m return submissions, count, nil } + +func UpdateSubmission(tx *gorm.DB, submission model.JudgeTaskSubmission) error { + updatingSubmission := model.JudgeTaskSubmission{} + if submission.UID != uuid.Nil { + err := tx.Where("uid = ?", submission.UID).First(&updatingSubmission).Error + if err != nil { + return err + } + } else if submission.RedisStreamID != "" { + err := tx.Where("redis_stream_id = ?", submission.RedisStreamID).First(&updatingSubmission).Error + if err != nil { + return err + } + } else { + return fmt.Errorf("submission uid and redis stream id are both empty") + } + + if submission.Status != "" { + updatingSubmission.Status = submission.Status + } + if submission.VerdictJson != "" { + updatingSubmission.VerdictJson = submission.VerdictJson + } + if submission.RedisStreamID != "" { + updatingSubmission.RedisStreamID = submission.RedisStreamID + } + + return tx.Model(&updatingSubmission).Updates(updatingSubmission).Error +} diff --git a/service/model/judge.go b/service/model/judge.go index 313b0e9..8ab87fe 100644 --- a/service/model/judge.go +++ b/service/model/judge.go @@ -2,8 +2,6 @@ package model import ( "strings" - - "github.com/google/uuid" ) type JudgerState string @@ -21,19 +19,28 @@ type Judger struct { } type JudgeTask struct { - UID uuid.UUID `json:"uid"` - ProblemSlug string `json:"problemSlug"` - Code string `json:"code"` - Language string `json:"language"` - Judger Judger `json:"judger"` + SubmissionUID string `json:"submissionUID"` + ProblemSlug string `json:"problemSlug"` + Code string `json:"code"` + Language string `json:"language"` + RedisStreamID *string `json:"redisStreamID"` +} + +func (jt *JudgeTask) ToStringMap() map[string]interface{} { + return map[string]interface{}{ + "submission_uid": jt.SubmissionUID, + "problem_slug": jt.ProblemSlug, + "code": jt.Code, + "language": jt.Language, + } } -func NewJudgeTask(problemSlug, code, language string) *JudgeTask { +func JudgeTaskFromMap(m map[string]interface{}) *JudgeTask { return &JudgeTask{ - UID: uuid.New(), - ProblemSlug: problemSlug, - Code: code, - Language: language, + SubmissionUID: m["submission_uid"].(string), + ProblemSlug: m["problem_slug"].(string), + Code: m["code"].(string), + Language: m["language"].(string), } } diff --git a/service/model/submission.go b/service/model/submission.go index 230dd93..068d8a8 100644 --- a/service/model/submission.go +++ b/service/model/submission.go @@ -1,6 +1,8 @@ package model -import "github.com/google/uuid" +import ( + "github.com/google/uuid" +) type SubmissionStatus string @@ -13,6 +15,10 @@ const ( type SubmissionLanguage string +func (sl SubmissionLanguage) String() string { + return string(sl) +} + const ( SubmissionLanguageCpp SubmissionLanguage = "Cpp" SubmissionLanguageRust SubmissionLanguage = "Rust" @@ -22,15 +28,16 @@ const ( // Using relationship according to https://gorm.io/docs/belongs_to.html type JudgeTaskSubmission struct { MetaFields - UID uuid.UUID `gorm:"primaryKey" json:"uid"` - UserAccount string `gorm:"not null" json:"userAccount"` - User User `json:"user"` - ProblemSlug string `gorm:"not null" json:"problemSlug"` - Problem Problem `json:"problem"` - Code string `gorm:"not null" json:"code"` - Language SubmissionLanguage `gorm:"not null" json:"language"` - Status SubmissionStatus `gorm:"default:pending" json:"status"` - VerdictJson string `json:"verdictJson"` + UID uuid.UUID `gorm:"primaryKey" json:"UID"` + RedisStreamID string `json:"redisStreamID"` + UserAccount string `gorm:"not null" json:"userAccount"` + User User `json:"user"` + ProblemSlug string `gorm:"not null" json:"problemSlug"` + Problem Problem `json:"problem"` + Code string `gorm:"not null" json:"code"` + Language SubmissionLanguage `gorm:"not null" json:"language"` + Status SubmissionStatus `gorm:"default:pending" json:"status"` + VerdictJson string `json:"verdictJson"` } type JudgeTaskSubmissionSortByColumn string @@ -54,3 +61,12 @@ func NewSubmission( Status: SubmissionStatusPending, } } + +func (s *JudgeTaskSubmission) ToJudgeTask() JudgeTask { + return JudgeTask{ + SubmissionUID: s.UID.String(), + ProblemSlug: s.ProblemSlug, + Code: s.Code, + Language: s.Language.String(), + } +} diff --git a/service/problem.go b/service/problem.go index ba4764d..5477715 100644 --- a/service/problem.go +++ b/service/problem.go @@ -37,19 +37,6 @@ func PutProblemPackage(ctx context.Context, slug, zipFile string) error { return nil } -func PostSubmission( - ctx context.Context, problemSlug, code string, language model.SubmissionLanguage, -) (*model.JudgeTaskSubmission, error) { - submission := model.NewSubmission("", problemSlug, code, language) - db := gormAgent.GetDefaultDB() - result, err := mapper.CreateSubmission(db, submission) - if err != nil { - return nil, err - } - - return result, nil -} - // func Judge(ctx context.Context, slug string, code string, language string) ( // []map[string]interface{}, error, // ) { diff --git a/service/submission.go b/service/submission.go index 108ead7..4780a9a 100644 --- a/service/submission.go +++ b/service/submission.go @@ -2,9 +2,11 @@ package service import ( "context" + "fmt" "github.com/OJ-lab/oj-lab-services/core" gormAgent "github.com/OJ-lab/oj-lab-services/core/agent/gorm" + "github.com/OJ-lab/oj-lab-services/service/business" "github.com/OJ-lab/oj-lab-services/service/mapper" "github.com/OJ-lab/oj-lab-services/service/model" ) @@ -30,5 +32,17 @@ func CreateJudgeTaskSubmission( return nil, core.NewInternalError("failed to create submission") } + task := newSubmission.ToJudgeTask() + streamId, err := business.AddTaskToStream(ctx, &task) + if err != nil { + return nil, core.NewInternalError(fmt.Sprintf("failed to add task to stream %v", err)) + } + + newSubmission.RedisStreamID = *streamId + err = mapper.UpdateSubmission(db, *newSubmission) + if err != nil { + return nil, core.NewInternalError("failed to update submission") + } + return newSubmission, nil } diff --git a/service/task.go b/service/task.go new file mode 100644 index 0000000..13b8a28 --- /dev/null +++ b/service/task.go @@ -0,0 +1,51 @@ +package service + +import ( + "context" + + gormAgent "github.com/OJ-lab/oj-lab-services/core/agent/gorm" + "github.com/OJ-lab/oj-lab-services/service/business" + "github.com/OJ-lab/oj-lab-services/service/mapper" + "github.com/OJ-lab/oj-lab-services/service/model" + "github.com/google/uuid" +) + +func PickJudgeTask(ctx context.Context, consumer string) (*model.JudgeTask, error) { + task, err := business.GetTaskFromStream(ctx, consumer) + if err != nil { + return nil, err + } + + db := gormAgent.GetDefaultDB() + err = mapper.UpdateSubmission(db, model.JudgeTaskSubmission{ + UID: uuid.MustParse(task.SubmissionUID), + Status: model.SubmissionStatusRunning, + }) + if err != nil { + return nil, err + } + + return task, nil +} + +func ReportJudgeTaskResult( + ctx context.Context, + consumer string, streamID string, verdictJson string, +) error { + db := gormAgent.GetDefaultDB() + err := mapper.UpdateSubmission(db, model.JudgeTaskSubmission{ + RedisStreamID: streamID, + Status: model.SubmissionStatusFinished, + VerdictJson: verdictJson, + }) + if err != nil { + return err + } + + err = business.AckTaskFromStream(ctx, consumer, streamID) + if err != nil { + return err + } + + return nil +}