Skip to content

Commit

Permalink
fix: refactor
Browse files Browse the repository at this point in the history
FMotalleb committed Jun 6, 2024
1 parent 6486788 commit d01891e
Showing 6 changed files with 135 additions and 99 deletions.
48 changes: 48 additions & 0 deletions core/jobs/initializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package jobs

import (
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"

"github.com/FMotalleb/crontab-go/abstraction"
"github.com/FMotalleb/crontab-go/config"
cfgcompiler "github.com/FMotalleb/crontab-go/config/compiler"
"github.com/FMotalleb/crontab-go/core/goutils"
)

func initEventSignal(schedulers []abstraction.Scheduler, logger *logrus.Entry) <-chan any {
signals := make([]<-chan any, 0, len(schedulers))
for _, sh := range schedulers {
signals = append(signals, sh.BuildTickChannel())
}
logger.Trace("Signals Built")
signal := goutils.Zip(signals...)
return signal
}

func initTasks(job config.JobConfig, logger *logrus.Entry) ([]abstraction.Executable, []abstraction.Executable, []abstraction.Executable) {
tasks := make([]abstraction.Executable, 0, len(job.Tasks))
doneHooks := make([]abstraction.Executable, 0, len(job.Hooks.Done))
failHooks := make([]abstraction.Executable, 0, len(job.Hooks.Failed))
for _, t := range job.Tasks {
tasks = append(tasks, cfgcompiler.CompileTask(&t, logger))
}
logger.Trace("Compiled Tasks")
for _, t := range job.Hooks.Done {
doneHooks = append(doneHooks, cfgcompiler.CompileTask(&t, logger))
}
logger.Trace("Compiled Hooks.Done")
for _, t := range job.Hooks.Failed {
failHooks = append(failHooks, cfgcompiler.CompileTask(&t, logger))
}
logger.Trace("Compiled Hooks.Fail")
return tasks, doneHooks, failHooks
}

func initSchedulers(job config.JobConfig, cronInstance *cron.Cron, logger *logrus.Entry) []abstraction.Scheduler {
schedulers := make([]abstraction.Scheduler, 0, len(job.Schedulers))
for _, sh := range job.Schedulers {
schedulers = append(schedulers, cfgcompiler.CompileScheduler(&sh, cronInstance, logger))
}
return schedulers
}
48 changes: 48 additions & 0 deletions core/jobs/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package jobs

import (
"context"

"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"

"github.com/FMotalleb/crontab-go/cmd"
"github.com/FMotalleb/crontab-go/config"
"github.com/FMotalleb/crontab-go/ctxutils"
)

func InitializeJobs(log *logrus.Entry, cronInstance *cron.Cron) {
for _, job := range cmd.CFG.Jobs {
if !job.Enabled {
log.Warnf("job %s is disabled", job.Name)
continue
}

c := context.Background()
c = context.WithValue(c, ctxutils.JobKey, job)

logger := initLogger(c, log, job)

if err := job.Validate(); err != nil {
log.Panicf("failed to validate job (%s): %v", job.Name, err)
}

schedulers := initSchedulers(job, cronInstance, logger)
logger.Trace("Schedulers initialized")

tasks, doneHooks, failHooks := initTasks(job, logger)
logger.Trace("Tasks initialized")

signal := initEventSignal(schedulers, logger)

go taskHandler(c, logger, signal, tasks, doneHooks, failHooks)
logger.Trace("EventLoop initialized")
}
log.Debugln("Jobs Are Ready")
}

func initLogger(c context.Context, log *logrus.Entry, job config.JobConfig) *logrus.Entry {
logger := log.WithContext(c).WithField("job.name", job.Name)
logger.Trace("Initializing Job")
return logger
}
35 changes: 35 additions & 0 deletions core/jobs/task_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package jobs

import (
"context"

"github.com/sirupsen/logrus"

"github.com/FMotalleb/crontab-go/abstraction"
"github.com/FMotalleb/crontab-go/ctxutils"
)

func taskHandler(c context.Context, logger *logrus.Entry, signal <-chan any, tasks []abstraction.Executable, doneHooks []abstraction.Executable, failHooks []abstraction.Executable) {
logger.Debug("Spawning task handler")
for range signal {
logger.Trace("Signal Received")
for _, task := range tasks {
go executeTask(c, task, doneHooks, failHooks)
}
}
}

func executeTask(c context.Context, task abstraction.Executable, doneHooks []abstraction.Executable, failHooks []abstraction.Executable) {
ctx := context.WithValue(c, ctxutils.TaskKey, task)
err := task.Execute(ctx)
switch err {
case nil:
for _, task := range doneHooks {
_ = task.Execute(ctx)
}
default:
for _, task := range failHooks {
_ = task.Execute(ctx)
}
}
}
20 changes: 0 additions & 20 deletions ctxutils/context.go

This file was deleted.

1 change: 1 addition & 0 deletions ctxutils/keys.go
Original file line number Diff line number Diff line change
@@ -7,4 +7,5 @@ var (
LoggerKey ContextKey = ContextKey("logger")
RetryCountKey ContextKey = ContextKey("retry-count")
JobKey ContextKey = ContextKey("job")
TaskKey ContextKey = ContextKey("task")
)
82 changes: 3 additions & 79 deletions main.go
Original file line number Diff line number Diff line change
@@ -17,96 +17,20 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package main

import (
"context"

"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"

"github.com/FMotalleb/crontab-go/abstraction"
"github.com/FMotalleb/crontab-go/cmd"
cfgcompiler "github.com/FMotalleb/crontab-go/config/compiler"
"github.com/FMotalleb/crontab-go/core/goutils"
"github.com/FMotalleb/crontab-go/ctxutils"
cx "github.com/FMotalleb/crontab-go/ctxutils"
"github.com/FMotalleb/crontab-go/core/jobs"
"github.com/FMotalleb/crontab-go/logger"
)

var (
log *logrus.Entry
ctx cx.Context
)

func main() {
cmd.Execute()
ctx = cx.NewContext("core")
logger.InitFromConfig()
log = logger.SetupLogger("Crontab-GO")
log := logger.SetupLogger("Crontab-GO")
cronInstance := cron.New(cron.WithSeconds())
log.Info("Booting up")
for _, job := range cmd.CFG.Jobs {
if !job.Enabled {
log.Warn("job %s is disabled", job.Name)
continue
}
c := context.Background()
c = context.WithValue(c, ctxutils.JobKey, job)
logger := log.WithContext(c).WithField("job.name", job.Name)
logger.Trace("Initializing Job")
if err := job.Validate(); err != nil {
log.Panicln("failed to validate job: ", err)
}
schedulers := make([]abstraction.Scheduler, 0, len(job.Schedulers))
for _, sh := range job.Schedulers {
schedulers = append(schedulers, cfgcompiler.CompileScheduler(&sh, cronInstance, logger))
}
logger.Trace("Compiled Schedulers")
tasks := make([]abstraction.Executable, 0, len(job.Tasks))
doneHooks := make([]abstraction.Executable, 0, len(job.Hooks.Done))
failHooks := make([]abstraction.Executable, 0, len(job.Hooks.Failed))
for _, t := range job.Tasks {
tasks = append(tasks, cfgcompiler.CompileTask(&t, logger))
}

logger.Trace("Compiled Tasks")
for _, t := range job.Hooks.Done {
doneHooks = append(doneHooks, cfgcompiler.CompileTask(&t, logger))
}
logger.Trace("Compiled Hooks.Done")
for _, t := range job.Hooks.Failed {
failHooks = append(failHooks, cfgcompiler.CompileTask(&t, logger))
}
logger.Trace("Compiled Hooks.Fail")

signals := make([]<-chan any, 0, len(schedulers))

for _, sh := range schedulers {
signals = append(signals, sh.BuildTickChannel())
}
logger.Trace("Signals Built")
signal := goutils.Zip(signals...)

logger.Trace("Zipping Signals")
go func() {
logger.Debug("Spawned work goroutine")
for range signal {
logger.Trace("Signal Received")
for _, task := range tasks {
ctx := context.Background()
err := task.Execute(ctx)
switch err {
case nil:
for _, task := range doneHooks {
_ = task.Execute(ctx)
}
default:
for _, task := range failHooks {
_ = task.Execute(ctx)
}
}
}
}
}()
}
jobs.InitializeJobs(log, cronInstance)
cronInstance.Start()
<-make(chan any)
}

0 comments on commit d01891e

Please sign in to comment.