Skip to content

Commit

Permalink
Prepare judge stream business
Browse files Browse the repository at this point in the history
  • Loading branch information
slhmy committed Jan 6, 2024
1 parent ace7e1f commit b93d8c0
Show file tree
Hide file tree
Showing 15 changed files with 293 additions and 53 deletions.
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Server",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "application/server/main.go"
}
]
}
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ run-rpc-server: build check

.PHONY: run-background
run-background: build check
make -j run-task-worker run-schedule
make -j run-schedule

.PHONY: run-all
run-all: build check
make -j run-task-worker run-server run-schedule
make -j run-server run-schedule

.PHONY: help
help:
Expand Down
12 changes: 1 addition & 11 deletions application/schedule/main.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package main

import (
asynqAgent "github.com/OJ-lab/oj-lab-services/core/agent/asynq"
"github.com/OJ-lab/oj-lab-services/service/business"
)

func main() {
asynqAgent.RunSecheduler(
asynqAgent.ScheduleTask{
Cronspec: "@every 1s",
Task: business.NewTaskJudgerTrackAllState(),
},
)

}
47 changes: 47 additions & 0 deletions application/server/handler/judge.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ func SetupJudgeRoute(baseRoute *gin.RouterGroup) {
g := baseRoute.Group("/judge")
{
g.POST("/add-judger", postJudger)
g.POST("/task/pick", postPickJudgeTask)
g.POST("/task/report", postReportJudgeTaskResult)
}
}

Expand All @@ -29,3 +31,48 @@ func postJudger(ginCtx *gin.Context) {
"message": "success",
})
}

type PickJudgeTaskBody struct {
Consumer string `json:"consumer"`
}

func postPickJudgeTask(ginCtx *gin.Context) {
body := PickJudgeTaskBody{}
if err := ginCtx.ShouldBindJSON(&body); err != nil {
ginCtx.Error(err)
return
}

task, err := service.PickJudgeTask(ginCtx, body.Consumer)
if err != nil {
ginCtx.Error(err)
return
}

ginCtx.JSON(200, gin.H{
"task": task,
})
}

type ReportJudgeTaskResultBody struct {
Consumer string `json:"consumer"`
StreamID string `json:"stream_id"`
VerdictJson string `json:"verdict_json"`
}

func postReportJudgeTaskResult(ginCtx *gin.Context) {
body := ReportJudgeTaskResultBody{}
if err := ginCtx.ShouldBindJSON(&body); err != nil {
ginCtx.Error(err)
return
}

if err := service.ReportJudgeTaskResult(ginCtx, body.Consumer, body.StreamID, body.VerdictJson); err != nil {
ginCtx.Error(err)
return
}

ginCtx.JSON(200, gin.H{
"message": "success",
})
}
9 changes: 5 additions & 4 deletions application/server/handler/problem.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ func postSubmission(ginCtx *gin.Context) {
return
}

submission, err := service.PostSubmission(ginCtx, slug, body.Code, body.Language)
if err != nil {
ginCtx.Error(err)
submission := model.NewSubmission("", slug, body.Code, body.Language)
result, svcErr := service.CreateJudgeTaskSubmission(ginCtx, submission)
if svcErr != nil {
svcErr.AppendToGin(ginCtx)
return
}

ginCtx.JSON(200, submission)
ginCtx.JSON(200, result)
}
1 change: 1 addition & 0 deletions application/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func main() {
handler.SetupProblemRoute(apiRouter)
handler.SetupEventRouter(apiRouter)
handler.SetupSubmissionRouter(apiRouter)
handler.SetupJudgeRoute(apiRouter)

err := r.Run(servicePort)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
github.com/google/uuid v1.3.1
github.com/redis/go-redis/v9 v9.1.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/viper v1.16.0
github.com/swaggo/swag v1.16.2
Expand Down Expand Up @@ -78,7 +79,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.9 // indirect
github.com/redis/go-redis/v9 v9.1.0
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/ugorji/go/codec v1.2.11 // indirect
Expand Down
78 changes: 78 additions & 0 deletions service/business/judge_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package business

import (
"context"

redisAgent "github.com/OJ-lab/oj-lab-services/core/agent/redis"
"github.com/OJ-lab/oj-lab-services/service/model"
"github.com/redis/go-redis/v9"
)

const (
streamName = "oj_lab_judge_stream"
consumerGroupName = "oj_lab_judge_stream_consumer_group"
defaultConsumerName = "oj_lab_judge_stream_consumer_default"
)

func init() {
redisAgent := redisAgent.GetDefaultRedisClient()
_, err := redisAgent.XGroupCreateMkStream(context.Background(), streamName, consumerGroupName, "0").Result()
if err != nil && err != redis.Nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
panic(err)
}
}

func AddTaskToStream(ctx context.Context, task *model.JudgeTask) (*string, error) {
redisAgent := redisAgent.GetDefaultRedisClient()
id, err := redisAgent.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: task.ToStringMap(),
}).Result()
if err != nil {
return nil, err
}

return &id, err
}

func GetTaskFromStream(ctx context.Context, consumer string) (*model.JudgeTask, error) {
redisAgent := redisAgent.GetDefaultRedisClient()
if consumer == "" {
consumer = defaultConsumerName
}
result, err := redisAgent.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: consumerGroupName,
Consumer: consumer,
Streams: []string{streamName, ">"},
Count: 1,
Block: -1,
}).Result()

if err != nil {
return nil, err
}
if len(result) == 0 {
return nil, nil
}

task := model.JudgeTask{}
for _, message := range result[0].Messages {
task = *model.JudgeTaskFromMap(message.Values)
task.RedisStreamID = &message.ID
}

return &task, nil
}

func AckTaskFromStream(ctx context.Context, consumer string, streamID string) error {
redisAgent := redisAgent.GetDefaultRedisClient()
if consumer == "" {
consumer = defaultConsumerName
}
_, err := redisAgent.XAck(ctx, streamName, consumerGroupName, streamID).Result()
if err != nil {
return err
}

return nil
}
2 changes: 2 additions & 0 deletions service/business/judger_task.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// ! Deprected

package business

import (
Expand Down
31 changes: 31 additions & 0 deletions service/mapper/submission.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mapper

import (
"fmt"

"github.com/OJ-lab/oj-lab-services/service/model"
"github.com/google/uuid"
"gorm.io/gorm"
Expand Down Expand Up @@ -77,3 +79,32 @@ func GetSubmissionListByOptions(tx *gorm.DB, options GetSubmissionOptions) ([]*m

return submissions, count, nil
}

func UpdateSubmission(tx *gorm.DB, submission model.JudgeTaskSubmission) error {
updatingSubmission := model.JudgeTaskSubmission{}
if submission.UID != uuid.Nil {
err := tx.Where("uid = ?", submission.UID).First(&updatingSubmission).Error
if err != nil {
return err
}
} else if submission.RedisStreamID != "" {
err := tx.Where("redis_stream_id = ?", submission.RedisStreamID).First(&updatingSubmission).Error
if err != nil {
return err
}
} else {
return fmt.Errorf("submission uid and redis stream id are both empty")
}

if submission.Status != "" {
updatingSubmission.Status = submission.Status
}
if submission.VerdictJson != "" {
updatingSubmission.VerdictJson = submission.VerdictJson
}
if submission.RedisStreamID != "" {
updatingSubmission.RedisStreamID = submission.RedisStreamID
}

return tx.Model(&updatingSubmission).Updates(updatingSubmission).Error
}
31 changes: 19 additions & 12 deletions service/model/judge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package model

import (
"strings"

"github.com/google/uuid"
)

type JudgerState string
Expand All @@ -21,19 +19,28 @@ type Judger struct {
}

type JudgeTask struct {
UID uuid.UUID `json:"uid"`
ProblemSlug string `json:"problemSlug"`
Code string `json:"code"`
Language string `json:"language"`
Judger Judger `json:"judger"`
SubmissionUID string `json:"submissionUID"`
ProblemSlug string `json:"problemSlug"`
Code string `json:"code"`
Language string `json:"language"`
RedisStreamID *string `json:"redisStreamID"`
}

func (jt *JudgeTask) ToStringMap() map[string]interface{} {
return map[string]interface{}{
"submission_uid": jt.SubmissionUID,
"problem_slug": jt.ProblemSlug,
"code": jt.Code,
"language": jt.Language,
}
}

func NewJudgeTask(problemSlug, code, language string) *JudgeTask {
func JudgeTaskFromMap(m map[string]interface{}) *JudgeTask {
return &JudgeTask{
UID: uuid.New(),
ProblemSlug: problemSlug,
Code: code,
Language: language,
SubmissionUID: m["submission_uid"].(string),
ProblemSlug: m["problem_slug"].(string),
Code: m["code"].(string),
Language: m["language"].(string),
}
}

Expand Down
36 changes: 26 additions & 10 deletions service/model/submission.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package model

import "github.com/google/uuid"
import (
"github.com/google/uuid"
)

type SubmissionStatus string

Expand All @@ -13,6 +15,10 @@ const (

type SubmissionLanguage string

func (sl SubmissionLanguage) String() string {
return string(sl)
}

const (
SubmissionLanguageCpp SubmissionLanguage = "Cpp"
SubmissionLanguageRust SubmissionLanguage = "Rust"
Expand All @@ -22,15 +28,16 @@ const (
// Using relationship according to https://gorm.io/docs/belongs_to.html
type JudgeTaskSubmission struct {
MetaFields
UID uuid.UUID `gorm:"primaryKey" json:"uid"`
UserAccount string `gorm:"not null" json:"userAccount"`
User User `json:"user"`
ProblemSlug string `gorm:"not null" json:"problemSlug"`
Problem Problem `json:"problem"`
Code string `gorm:"not null" json:"code"`
Language SubmissionLanguage `gorm:"not null" json:"language"`
Status SubmissionStatus `gorm:"default:pending" json:"status"`
VerdictJson string `json:"verdictJson"`
UID uuid.UUID `gorm:"primaryKey" json:"UID"`
RedisStreamID string `json:"redisStreamID"`
UserAccount string `gorm:"not null" json:"userAccount"`
User User `json:"user"`
ProblemSlug string `gorm:"not null" json:"problemSlug"`
Problem Problem `json:"problem"`
Code string `gorm:"not null" json:"code"`
Language SubmissionLanguage `gorm:"not null" json:"language"`
Status SubmissionStatus `gorm:"default:pending" json:"status"`
VerdictJson string `json:"verdictJson"`
}

type JudgeTaskSubmissionSortByColumn string
Expand All @@ -54,3 +61,12 @@ func NewSubmission(
Status: SubmissionStatusPending,
}
}

func (s *JudgeTaskSubmission) ToJudgeTask() JudgeTask {
return JudgeTask{
SubmissionUID: s.UID.String(),
ProblemSlug: s.ProblemSlug,
Code: s.Code,
Language: s.Language.String(),
}
}
Loading

0 comments on commit b93d8c0

Please sign in to comment.