Skip to content

Commit

Permalink
feat: docker event listener
Browse files Browse the repository at this point in the history
  • Loading branch information
FMotalleb committed Jul 13, 2024
1 parent 9929da4 commit 657ab15
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 62 deletions.
26 changes: 8 additions & 18 deletions config.local.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,14 @@
# yaml-language-server: $schema=schema.json
#TODO: unix/tcp socket controller
#TODO: prometheus exporter

jobs:
- name: Test Job
- name: echo
tasks:
- command: /bin/ps
retry-delay: 5s
retries: 0
connections:
# - container: 614674b79a1b
# volumes:
# - "/home/motalleb/Downloads:/var/local/test"
- image: alpine
env:
SHELL: /bin/sh
SHELL_ARGS: "-c"
- command: echo "received"
events:
- on-init: true
# - interval: 1s
- web-event: test
- web-event: test
- web-event: test
- docker:
connection: unix:///var/run/docker.sock
name: "^alpine"
image: "^alpine.*"
actions:
- start
37 changes: 29 additions & 8 deletions config/compiler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,57 @@
package cfgcompiler

import (
"time"

"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"

"github.com/FMotalleb/crontab-go/abstraction"
"github.com/FMotalleb/crontab-go/config"
"github.com/FMotalleb/crontab-go/core/event"
"github.com/FMotalleb/crontab-go/core/utils"
)

func CompileEvent(sh *config.JobEvent, cr *cron.Cron, logger *logrus.Entry) abstraction.Event {
switch {
case sh.Cron != "":
events := event.NewCron(
event := event.NewCron(
sh.Cron,
cr,
logger,
)
return &events
return &event
case sh.WebEvent != "":
events := event.NewEventListener(sh.WebEvent)
return &events
event := event.NewEventListener(sh.WebEvent)
return &event
case sh.Interval != 0:
events := event.NewInterval(
event := event.NewInterval(
sh.Interval,
logger,
)
return &events
return &event

case sh.OnInit:
events := event.Init{}
return &events
event := event.Init{}
return &event

case sh.Docker != nil:
d := sh.Docker
con := utils.MayFirstNonZero(d.Connection,
"unix:///var/run/docker.sock",
)
event := event.NewDockerEvent(
con,
d.Name,
d.Image,
d.Actions,
d.Labels,
utils.MayFirstNonZero(d.ErrorLimit, 1),
utils.MayFirstNonZero(d.ErrorLimitPolicy, event.Reconnect),
utils.MayFirstNonZero(d.ErrorThrottle, time.Second*5),
logger,
)
return event
}

return nil
Expand Down
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package config
import (
"time"

"github.com/FMotalleb/crontab-go/core/event"
"github.com/FMotalleb/crontab-go/enums"
)

Expand Down Expand Up @@ -47,6 +48,19 @@ type JobEvent struct {
Interval time.Duration `mapstructure:"interval" json:"interval,omitempty"`
OnInit bool `mapstructure:"on-init" json:"on-init,omitempty"`
WebEvent string `mapstructure:"web-event" json:"web-event,omitempty"`
Docker *DockerEvent `mapstructure:"docker" json:"docker,omitempty"`
}

// DockerEvent represents a Docker event configuration
type DockerEvent struct {
Connection string `mapstructure:"connection" json:"connection,omitempty"`
Name string `mapstructure:"name" json:"name,omitempty"`
Image string `mapstructure:"image" json:"image,omitempty"`
Actions []string `mapstructure:"actions" json:"actions,omitempty"`
Labels map[string]string `mapstructure:"labels" json:"labels,omitempty"`
ErrorLimit uint `mapstructure:"error-limit-count" json:"error-limit,omitempty"`
ErrorLimitPolicy event.ErrorLimitPolicy `mapstructure:"error-limit-policy" json:"error-limit-policy,omitempty"`
ErrorThrottle time.Duration `mapstructure:"error-throttle" json:"error-throttle,omitempty"`
}

// JobHooks represents the hooks configuration for a job.
Expand Down
128 changes: 118 additions & 10 deletions config/job_valdiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,63 @@ package config

import (
"fmt"
"regexp"

"github.com/docker/docker/api/types/events"
"github.com/sirupsen/logrus"

"github.com/FMotalleb/crontab-go/core/event"
"github.com/FMotalleb/crontab-go/core/utils"
)

var acceptedActions = utils.NewList(
events.ActionCreate,
events.ActionStart,
events.ActionRestart,
events.ActionStop,
events.ActionCheckpoint,
events.ActionPause,
events.ActionUnPause,
events.ActionAttach,
events.ActionDetach,
events.ActionResize,
events.ActionUpdate,
events.ActionRename,
events.ActionKill,
events.ActionDie,
events.ActionOOM,
events.ActionDestroy,
events.ActionRemove,
events.ActionCommit,
events.ActionTop,
events.ActionCopy,
events.ActionArchivePath,
events.ActionExtractToDir,
events.ActionExport,
events.ActionImport,
events.ActionSave,
events.ActionLoad,
events.ActionTag,
events.ActionUnTag,
events.ActionPush,
events.ActionPull,
events.ActionPrune,
events.ActionDelete,
events.ActionEnable,
events.ActionDisable,
events.ActionConnect,
events.ActionDisconnect,
events.ActionReload,
events.ActionMount,
events.ActionUnmount,
events.ActionExecCreate,
events.ActionExecStart,
events.ActionExecDie,
events.ActionExecDetach,
events.ActionHealthStatus,
events.ActionHealthStatusRunning,
events.ActionHealthStatusHealthy,
events.ActionHealthStatusUnhealthy,
)

func (c *JobConfig) Validate(log *logrus.Entry) error {
Expand Down Expand Up @@ -82,28 +135,35 @@ func (s *JobEvent) Validate(log *logrus.Entry) error {
} else if _, err := event.CronParser.Parse(s.Cron); s.Cron != "" && err != nil {
log.WithError(err).Warn("Validation failed for JobEvent")
return err
} else if s.Docker != nil {
returnValue := dockerValidation(s, log)
if returnValue != nil {
return returnValue
}
}

// Check the active events to ensure only one of on_init, interval, or cron is set
events := []bool{
s.Interval != 0,
// Check the active events to ensure only one of on_init, interval, docker, or cron is set
events := utils.NewList(s.Interval != 0,
s.Cron != "",
s.WebEvent != "",
s.Docker != nil,
s.OnInit,
}
activeEvents := 0
for _, t := range events {
if t {
activeEvents++
)
activeEvents := utils.Fold(events, 0, func(c int, item bool) int {
if item {
return c + 1
}
}
return c
})

if activeEvents != 1 {
err := fmt.Errorf(
"a single event must have one of (on-init: true,interval,cron,web-event) field, received:(on_init: %t,cron: `%s`, interval: `%s`, web_event: `%s`)",
"a single event must have one of (on-init: true,interval,cron,web-event,docker) field, received:(on_init: %t,cron: `%s`, interval: `%s`, web_event: `%s`, docker: %v)",
s.OnInit,
s.Cron,
s.Interval,
s.WebEvent,
s.Docker,
)
log.WithError(err).Warn("Validation failed for JobEvent")
return err
Expand All @@ -113,3 +173,51 @@ func (s *JobEvent) Validate(log *logrus.Entry) error {
log.Tracef("Validation successful for JobEvent: %+v", s)
return nil
}

func dockerValidation(s *JobEvent, log *logrus.Entry) error {
// Check if regex matchers are valid
checkList := utils.NewList[string]()
checkList.Add(
s.Docker.Name,
s.Docker.Image,
)
for _, v := range s.Docker.Labels {
checkList.Add(v)
}
err := utils.Fold(checkList, nil, func(initial error, item string) error {
if initial != nil {
return initial
}
_, err := regexp.Compile(s.Docker.Name)
return err
})
if err != nil {
log.WithError(err).Warn("Validation failed for one of docker regex pattern (container name, image name, labels value)")
return err
}
for _, i := range s.Docker.Actions {
if !acceptedActions.Contains(events.Action(i)) {
err := fmt.Errorf("given action: %#v is not allowed", i)
log.WithError(err).Warn("Validation failed for one of docker actions")
return err
}
}
// Validating error handler parameters
if s.Docker.ErrorLimit > 0 {
log.Debug("error limit will be set to 1")
}
if s.Docker.ErrorLimitPolicy == "" {
log.Info("error limit set to non-zero number but no error policy was specified, using default policy (reconnect)")
}
if !utils.NewList("", event.GiveUp, event.Kill, event.Reconnect).Contains(s.Docker.ErrorLimitPolicy) {
err := fmt.Errorf("given error limit policy: %#v is not allowed, possible error policies are (give-up,kill,reconnect)", s.Docker.ErrorLimitPolicy)
log.WithError(err).Warn("Validation failed for docker error limit policy")
return err
}
if s.Docker.ErrorThrottle < 0 {
err := fmt.Errorf("received a negative throttle value: `%v`", s.Docker.ErrorThrottle)
log.WithError(err).Warn("Validation failed for docker, throttling value error")
return err
}
return nil
}
Loading

0 comments on commit 657ab15

Please sign in to comment.