Skip to content

Commit

Permalink
feat: per-task hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
FMotalleb committed Jun 8, 2024
1 parent e6d9e26 commit 74275aa
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 20 deletions.
2 changes: 2 additions & 0 deletions abstraction/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ import "context"
// Executable is an object that can be executed using a execute method and stopped using cancel method
type Executable interface {
Execute(context.Context) error
SetDoneHooks([]Executable)
SetFailHooks([]Executable)
Cancel()
}
23 changes: 19 additions & 4 deletions config/compiler/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,28 @@ func CompileScheduler(sh *config.JobScheduler, cr *cron.Cron, logger *logrus.Ent
}

func CompileTask(sh *config.Task, logger *logrus.Entry) abstraction.Executable {
var t abstraction.Executable
switch {
case sh.Command != "":
return task.NewCommand(sh, logger)
t = task.NewCommand(sh, logger)
case sh.Get != "":
return task.NewGet(sh, logger)
t = task.NewGet(sh, logger)
case sh.Post != "":
return task.NewPost(sh, logger)
t = task.NewPost(sh, logger)
default:
logger.Fatalln("cannot handle given task config", sh)
}
return nil

onDone := []abstraction.Executable{}
for _, d := range sh.OnDone {
onDone = append(onDone, CompileTask(&d, logger))
}
t.SetDoneHooks(onDone)
onFail := []abstraction.Executable{}
for _, d := range sh.OnFail {
onFail = append(onFail, CompileTask(&d, logger))
}
t.SetFailHooks(onFail)

return t
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,8 @@ type (
RetryDelay time.Duration `mapstructure:"retry-delay" json:"retry_delay,omitempty"`
Timeout time.Duration `mapstructure:"timeout" json:"timeout,omitempty"`
Env EnvVariables `mapstructure:"env" json:"env,omitempty"`

OnDone []Task `mapstructure:"on-done" json:"on_done,omitempty"`
OnFail []Task `mapstructure:"on-fail" json:"on_fail,omitempty"`
}
)
10 changes: 10 additions & 0 deletions config/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ func (t *Task) Validate(log *logrus.Entry) error {
t,
)
}
for _, task := range t.OnDone {
if err := task.Validate(log); err != nil {
return err
}
}
for _, task := range t.OnFail {
if err := task.Validate(log); err != nil {
return err
}
}
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions core/os_credential/unix_credential.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func SetUser(log *logrus.Entry, proc *exec.Cmd, usr string, grp string) {
}

func lookupGID(grp string, log *logrus.Entry) (gid uint32, err error) {
if grp == "" {
return 0, nil
}
g, err := osUser.LookupGroup(grp)
if err != nil {
log.Panicf("cannot find group with name %s in the os: %s, you've changed os users during application runtime", grp, err)
Expand All @@ -64,6 +67,9 @@ func lookupGID(grp string, log *logrus.Entry) (gid uint32, err error) {
}

func lookupUIDAndGID(usr string, log *logrus.Entry) (uid uint32, gid uint32, err error) {
if usr == "" {
return 0, 0, nil
}
u, err := osUser.Lookup(usr)
if err != nil {
log.Panicf("cannot find user with name %s in the os: %s, you've changed os users during application runtime", usr, err)
Expand Down
16 changes: 16 additions & 0 deletions core/task/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ type Command struct {
retries uint
retryDelay time.Duration
timeout time.Duration

doneHooks []abstraction.Executable
failHooks []abstraction.Executable
}

// SetDoneHooks implements abstraction.Executable.
func (c *Command) SetDoneHooks(done []abstraction.Executable) {
c.doneHooks = done
}

// SetFailHooks implements abstraction.Executable.
func (c *Command) SetFailHooks(fail []abstraction.Executable) {
c.failHooks = fail
}

// Cancel implements abstraction.Executable.
Expand All @@ -49,6 +62,7 @@ func (c *Command) Execute(ctx context.Context) (e error) {
log := c.log.WithField("retry", r)
if getRetry(ctx) > c.retries {
log.Warn("maximum retry reached")
runTasks(c.failHooks)
return fmt.Errorf("maximum retries reached")
}
if r != 0 {
Expand Down Expand Up @@ -83,6 +97,8 @@ func (c *Command) Execute(ctx context.Context) (e error) {
log.Warn("failed to execute the command ", e)
return c.Execute(ctx)
}

runTasks(c.doneHooks)
return

Check failure on line 102 in core/task/command.go

View workflow job for this annotation

GitHub Actions / analyze (go)

naked return in func `Execute` with 43 lines of code (nakedret)
}

Expand Down
15 changes: 15 additions & 0 deletions core/task/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ type Get struct {
retries uint
retryDelay time.Duration
timeout time.Duration

doneHooks []abstraction.Executable
failHooks []abstraction.Executable
}

// SetDoneHooks implements abstraction.Executable.
func (g *Get) SetDoneHooks(done []abstraction.Executable) {
g.doneHooks = done
}

// SetFailHooks implements abstraction.Executable.
func (g *Get) SetFailHooks(fail []abstraction.Executable) {
g.failHooks = fail
}

// Cancel implements abstraction.Executable.
Expand All @@ -37,6 +50,7 @@ func (g *Get) Execute(ctx context.Context) (e error) {
log := g.log.WithField("retry", r)
if getRetry(ctx) > g.retries {
log.Warn("maximum retry reached")
runTasks(g.failHooks)
return fmt.Errorf("maximum retries reached")
}
if r != 0 {
Expand Down Expand Up @@ -80,6 +94,7 @@ func (g *Get) Execute(ctx context.Context) (e error) {
log.Warnln("request failed")
return g.Execute(ctx)
}
runTasks(g.doneHooks)
return
}

Expand Down
7 changes: 7 additions & 0 deletions core/task/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/sirupsen/logrus"

"github.com/FMotalleb/crontab-go/abstraction"
"github.com/FMotalleb/crontab-go/ctxutils"
)

Expand Down Expand Up @@ -49,3 +50,9 @@ func (r *ResponseWriter) Write(p []byte) (n int, err error) {
func (r *ResponseWriter) String() string {
return string(r.buffer)
}

func runTasks(exec []abstraction.Executable) {
for _, t := range exec {
_ = t.Execute(context.Background())
}
}
48 changes: 32 additions & 16 deletions core/task/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,59 @@ type Post struct {
retries uint
retryDelay time.Duration
timeout time.Duration

doneHooks []abstraction.Executable
failHooks []abstraction.Executable
}

// SetDoneHooks implements abstraction.Executable.
func (p *Post) SetDoneHooks(done []abstraction.Executable) {
p.doneHooks = done
}

// SetFailHooks implements abstraction.Executable.
func (p *Post) SetFailHooks(fail []abstraction.Executable) {
p.failHooks = fail
}

// Cancel implements abstraction.Executable.
func (g *Post) Cancel() {
if g.cancel != nil {
g.log.Debugln("canceling get request")
g.cancel()
func (p *Post) Cancel() {
if p.cancel != nil {
p.log.Debugln("canceling get request")
p.cancel()
}
}

// Execute implements abstraction.Executable.
func (g *Post) Execute(ctx context.Context) (e error) {
func (p *Post) Execute(ctx context.Context) (e error) {
r := getRetry(ctx)
log := g.log.WithField("retry", r)
if getRetry(ctx) > g.retries {
log := p.log.WithField("retry", r)
if getRetry(ctx) > p.retries {
log.Warn("maximum retry reached")
runTasks(p.failHooks)
return fmt.Errorf("maximum retries reached")
}
if r != 0 {
log.Debugln("waiting", g.retryDelay, "before executing the next iteration after last fail")
time.Sleep(g.retryDelay)
log.Debugln("waiting", p.retryDelay, "before executing the next iteration after last fail")
time.Sleep(p.retryDelay)
}
ctx = increaseRetry(ctx)
// ctx := context.Background()
var localCtx context.Context
if g.timeout != 0 {
localCtx, g.cancel = context.WithTimeout(ctx, g.timeout)
if p.timeout != 0 {
localCtx, p.cancel = context.WithTimeout(ctx, p.timeout)
} else {
localCtx, g.cancel = context.WithCancel(ctx)
localCtx, p.cancel = context.WithCancel(ctx)
}
client := &http.Client{}
data, _ := json.Marshal(g.data)
data, _ := json.Marshal(p.data)

req, e := http.NewRequestWithContext(localCtx, "POST", g.address, bytes.NewReader(data))
req, e := http.NewRequestWithContext(localCtx, "POST", p.address, bytes.NewReader(data))
log.Debugln("sending get http request")
if e != nil {
return
}
for key, val := range *g.headers {
for key, val := range *p.headers {
req.Header.Add(key, val)
}

Expand All @@ -82,8 +96,10 @@ func (g *Post) Execute(ctx context.Context) (e error) {

if e != nil || res.StatusCode >= 400 {
log.Warnln("request failed")
return g.Execute(ctx)
return p.Execute(ctx)
}

runTasks(p.doneHooks)
return
}

Expand Down
14 changes: 14 additions & 0 deletions schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,20 @@
],
"description": "A string that represents the URL to be sent using the POST method."
},
"on-done": {
"type": "array",
"items": {
"$ref": "#/definitions/Task"
},
"description": "List of optional hooks"
},
"on-fail": {
"type": "array",
"items": {
"$ref": "#/definitions/Task"
},
"description": "Username that this command must run as. (root privilege needed)"
},
"data": {
"$ref": "#/definitions/Data",
"description": "A Data object that defines the data to be sent with the request."
Expand Down

0 comments on commit 74275aa

Please sign in to comment.