From 1bfd62d5f592b74b879a9e1b678f58fc1c4c5ed3 Mon Sep 17 00:00:00 2001 From: Motalleb Fallahnezhad Date: Fri, 7 Jun 2024 16:53:51 +0330 Subject: [PATCH] feat: on_init scheduler --- config.example.yaml | 1 + config/compiler/compiler.go | 4 ++++ config/config.go | 1 + config/validators.go | 4 +++- core/goutils/goutils.go | 2 +- core/jobs/initializer.go | 2 +- core/jobs/runner.go | 14 ++++++++++---- core/schedule/init.go | 22 ++++++++++++++++++++++ core/schedule/interval.go | 13 ++----------- schema.json | 12 +++++++++++- 10 files changed, 56 insertions(+), 19 deletions(-) create mode 100644 core/schedule/init.go diff --git a/config.example.yaml b/config.example.yaml index f643eed..4d58057 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -42,6 +42,7 @@ jobs: data: key: value schedulers: + - on_init: true # Schedulers can be defined using either a cron expression or an interval, but not both simultaneously. # However, you can combine multiple cron expressions and intervals within the same scheduler. diff --git a/config/compiler/compiler.go b/config/compiler/compiler.go index 1d5870d..5beb0c5 100644 --- a/config/compiler/compiler.go +++ b/config/compiler/compiler.go @@ -25,6 +25,10 @@ func CompileScheduler(sh *config.JobScheduler, cr *cron.Cron, logger *logrus.Ent logger, ) return &scheduler + + case sh.OnInit: + scheduler := schedule.Init{} + return &scheduler } return nil diff --git a/config/config.go b/config/config.go index e0b8243..773c95d 100644 --- a/config/config.go +++ b/config/config.go @@ -33,6 +33,7 @@ type ( JobScheduler struct { Cron string `mapstructure:"cron" json:"cron,omitempty"` Interval time.Duration `mapstructure:"interval" json:"interval,omitempty"` + OnInit bool `mapstructure:"on_init" json:"on_init,omitempty"` } JobHooks struct { diff --git a/config/validators.go b/config/validators.go index 9a23336..714b74f 100644 --- a/config/validators.go +++ b/config/validators.go @@ -110,6 +110,7 @@ func (s *JobScheduler) Validate() error { schedules := []bool{ s.Interval != 0, s.Cron != "", + s.OnInit == true, } activeSchedules := 0 for _, t := range schedules { @@ -119,7 +120,8 @@ func (s *JobScheduler) Validate() error { } if activeSchedules != 1 { return fmt.Errorf( - "a single scheduler must have one of (at,interval,cron) field, received:(cron: `%s`, interval: `%s`)", + "a single scheduler must have one of (on_init: true,interval,cron) field, received:(on_init: %t,cron: `%s`, interval: `%s`)", + s.OnInit, s.Cron, s.Interval, ) diff --git a/core/goutils/goutils.go b/core/goutils/goutils.go index a43717b..de2ba2a 100644 --- a/core/goutils/goutils.go +++ b/core/goutils/goutils.go @@ -1,6 +1,6 @@ package goutils -func Zip[T interface{}](channels ...<-chan T) <-chan T { +func ZipChannels[T interface{}](channels ...<-chan T) <-chan T { output := make(chan T) for _, ch := range channels { go func() { diff --git a/core/jobs/initializer.go b/core/jobs/initializer.go index 44a1a4b..e85853c 100644 --- a/core/jobs/initializer.go +++ b/core/jobs/initializer.go @@ -16,7 +16,7 @@ func initEventSignal(schedulers []abstraction.Scheduler, logger *logrus.Entry) < signals = append(signals, sh.BuildTickChannel()) } logger.Trace("Signals Built") - signal := goutils.Zip(signals...) + signal := goutils.ZipChannels(signals...) return signal } diff --git a/core/jobs/runner.go b/core/jobs/runner.go index bddbab2..2c914e3 100644 --- a/core/jobs/runner.go +++ b/core/jobs/runner.go @@ -27,20 +27,26 @@ func InitializeJobs(log *logrus.Entry, cronInstance *cron.Cron) { log.Panicf("failed to validate job (%s): %v", job.Name, err) } - schedulers := initSchedulers(job, cronInstance, logger) - logger.Trace("Schedulers initialized") + signal := buildSignal(job, cronInstance, logger) tasks, doneHooks, failHooks := initTasks(job, logger) logger.Trace("Tasks initialized") - signal := initEventSignal(schedulers, logger) - go taskHandler(c, logger, signal, tasks, doneHooks, failHooks) logger.Trace("EventLoop initialized") } log.Debugln("Jobs Are Ready") } +func buildSignal(job config.JobConfig, cronInstance *cron.Cron, logger *logrus.Entry) <-chan any { + schedulers := initSchedulers(job, cronInstance, logger) + logger.Trace("Schedulers initialized") + + signal := initEventSignal(schedulers, logger) + + return signal +} + func initLogger(c context.Context, log *logrus.Entry, job config.JobConfig) *logrus.Entry { logger := log.WithContext(c).WithField("job.name", job.Name) logger.Trace("Initializing Job") diff --git a/core/schedule/init.go b/core/schedule/init.go new file mode 100644 index 0000000..830f137 --- /dev/null +++ b/core/schedule/init.go @@ -0,0 +1,22 @@ +package schedule + +type Init struct { + notifyChan chan any +} + +// BuildTickChannel implements abstraction.Scheduler. +func (c *Init) BuildTickChannel() <-chan any { + c.notifyChan = make(chan any) + + go func() { + c.notifyChan <- false + c.Cancel() + }() + + return c.notifyChan +} + +// cancel implements abstraction.Scheduler. +func (c *Init) Cancel() { + close(c.notifyChan) +} diff --git a/core/schedule/interval.go b/core/schedule/interval.go index 002cd5d..4f33e49 100644 --- a/core/schedule/interval.go +++ b/core/schedule/interval.go @@ -4,17 +4,8 @@ import ( "time" "github.com/sirupsen/logrus" - - "github.com/FMotalleb/crontab-go/abstraction" ) -func test() { - sh := &Interval{} - tester(sh) -} - -func tester(sh abstraction.Scheduler) {} - type Interval struct { duration time.Duration logger *logrus.Entry @@ -35,14 +26,14 @@ func NewInterval(schedule time.Duration, logger *logrus.Entry) Interval { } } -// buildTickChannel implements abstraction.Scheduler. +// BuildTickChannel implements abstraction.Scheduler. func (c *Interval) BuildTickChannel() <-chan any { c.Cancel() c.notifyChan = make(chan any) c.ticker = time.NewTicker(c.duration) go func() { - c.notifyChan <- false + // c.notifyChan <- false for range c.ticker.C { c.notifyChan <- false } diff --git a/schema.json b/schema.json index f82dc4b..2c4bd2d 100644 --- a/schema.json +++ b/schema.json @@ -107,7 +107,17 @@ }, "interval": { "type": "string", - "description": "A string that represents the interval at which the job should be executed." + "description": "A string that represents the interval at which the job should be executed.", + "examples": [ + "1s", + "10m", + "1h", + "5h30m15s" + ] + }, + "on_init": { + "type": "boolean", + "description": "Indicates that the job should trigger its tasks once initialized." } }, "required": [],