diff --git a/config.local.yaml b/config.local.yaml index 9f5b729..dca1b6b 100644 --- a/config.local.yaml +++ b/config.local.yaml @@ -1,24 +1,14 @@ # yaml-language-server: $schema=schema.json -#TODO: unix/tcp socket controller -#TODO: prometheus exporter jobs: - - name: Test Job + - name: echo tasks: - - command: /bin/ps - retry-delay: 5s - retries: 0 - connections: - # - container: 614674b79a1b - # volumes: - # - "/home/motalleb/Downloads:/var/local/test" - - image: alpine - env: - SHELL: /bin/sh - SHELL_ARGS: "-c" + - command: echo "received" events: - on-init: true - # - interval: 1s - - web-event: test - - web-event: test - - web-event: test + - docker: + connection: unix:///var/run/docker.sock + name: "^alpine" + image: "^alpine.*" + actions: + - start diff --git a/config/compiler/event.go b/config/compiler/event.go index 7d56105..7de8db1 100644 --- a/config/compiler/event.go +++ b/config/compiler/event.go @@ -2,36 +2,57 @@ package cfgcompiler import ( + "time" + "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" "github.com/FMotalleb/crontab-go/abstraction" "github.com/FMotalleb/crontab-go/config" "github.com/FMotalleb/crontab-go/core/event" + "github.com/FMotalleb/crontab-go/core/utils" ) func CompileEvent(sh *config.JobEvent, cr *cron.Cron, logger *logrus.Entry) abstraction.Event { switch { case sh.Cron != "": - events := event.NewCron( + event := event.NewCron( sh.Cron, cr, logger, ) - return &events + return &event case sh.WebEvent != "": - events := event.NewEventListener(sh.WebEvent) - return &events + event := event.NewEventListener(sh.WebEvent) + return &event case sh.Interval != 0: - events := event.NewInterval( + event := event.NewInterval( sh.Interval, logger, ) - return &events + return &event case sh.OnInit: - events := event.Init{} - return &events + event := event.Init{} + return &event + + case sh.Docker != nil: + d := sh.Docker + con := utils.MayFirstNonZero(d.Connection, + "unix:///var/run/docker.sock", + ) + event := event.NewDockerEvent( + con, + d.Name, + d.Image, + d.Actions, + d.Labels, + utils.MayFirstNonZero(d.ErrorLimit, 1), + utils.MayFirstNonZero(d.ErrorLimitPolicy, event.Reconnect), + utils.MayFirstNonZero(d.ErrorThrottle, time.Second*5), + logger, + ) + return event } return nil diff --git a/config/config.go b/config/config.go index 1fcff4b..94ecfa3 100644 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ package config import ( "time" + "github.com/FMotalleb/crontab-go/core/event" "github.com/FMotalleb/crontab-go/enums" ) @@ -47,6 +48,19 @@ type JobEvent struct { Interval time.Duration `mapstructure:"interval" json:"interval,omitempty"` OnInit bool `mapstructure:"on-init" json:"on-init,omitempty"` WebEvent string `mapstructure:"web-event" json:"web-event,omitempty"` + Docker *DockerEvent `mapstructure:"docker" json:"docker,omitempty"` +} + +// DockerEvent represents a Docker event configuration +type DockerEvent struct { + Connection string `mapstructure:"connection" json:"connection,omitempty"` + Name string `mapstructure:"name" json:"name,omitempty"` + Image string `mapstructure:"image" json:"image,omitempty"` + Actions []string `mapstructure:"actions" json:"actions,omitempty"` + Labels map[string]string `mapstructure:"labels" json:"labels,omitempty"` + ErrorLimit uint `mapstructure:"error-limit-count" json:"error-limit,omitempty"` + ErrorLimitPolicy event.ErrorLimitPolicy `mapstructure:"error-limit-policy" json:"error-limit-policy,omitempty"` + ErrorThrottle time.Duration `mapstructure:"error-throttle" json:"error-throttle,omitempty"` } // JobHooks represents the hooks configuration for a job. diff --git a/config/job_valdiator.go b/config/job_valdiator.go index c9b8c01..52927ee 100644 --- a/config/job_valdiator.go +++ b/config/job_valdiator.go @@ -2,10 +2,63 @@ package config import ( "fmt" + "regexp" + "github.com/docker/docker/api/types/events" "github.com/sirupsen/logrus" "github.com/FMotalleb/crontab-go/core/event" + "github.com/FMotalleb/crontab-go/core/utils" +) + +var acceptedActions = utils.NewList( + events.ActionCreate, + events.ActionStart, + events.ActionRestart, + events.ActionStop, + events.ActionCheckpoint, + events.ActionPause, + events.ActionUnPause, + events.ActionAttach, + events.ActionDetach, + events.ActionResize, + events.ActionUpdate, + events.ActionRename, + events.ActionKill, + events.ActionDie, + events.ActionOOM, + events.ActionDestroy, + events.ActionRemove, + events.ActionCommit, + events.ActionTop, + events.ActionCopy, + events.ActionArchivePath, + events.ActionExtractToDir, + events.ActionExport, + events.ActionImport, + events.ActionSave, + events.ActionLoad, + events.ActionTag, + events.ActionUnTag, + events.ActionPush, + events.ActionPull, + events.ActionPrune, + events.ActionDelete, + events.ActionEnable, + events.ActionDisable, + events.ActionConnect, + events.ActionDisconnect, + events.ActionReload, + events.ActionMount, + events.ActionUnmount, + events.ActionExecCreate, + events.ActionExecStart, + events.ActionExecDie, + events.ActionExecDetach, + events.ActionHealthStatus, + events.ActionHealthStatusRunning, + events.ActionHealthStatusHealthy, + events.ActionHealthStatusUnhealthy, ) func (c *JobConfig) Validate(log *logrus.Entry) error { @@ -82,28 +135,35 @@ func (s *JobEvent) Validate(log *logrus.Entry) error { } else if _, err := event.CronParser.Parse(s.Cron); s.Cron != "" && err != nil { log.WithError(err).Warn("Validation failed for JobEvent") return err + } else if s.Docker != nil { + returnValue := dockerValidation(s, log) + if returnValue != nil { + return returnValue + } } - // Check the active events to ensure only one of on_init, interval, or cron is set - events := []bool{ - s.Interval != 0, + // Check the active events to ensure only one of on_init, interval, docker, or cron is set + events := utils.NewList(s.Interval != 0, s.Cron != "", s.WebEvent != "", + s.Docker != nil, s.OnInit, - } - activeEvents := 0 - for _, t := range events { - if t { - activeEvents++ + ) + activeEvents := utils.Fold(events, 0, func(c int, item bool) int { + if item { + return c + 1 } - } + return c + }) + if activeEvents != 1 { err := fmt.Errorf( - "a single event must have one of (on-init: true,interval,cron,web-event) field, received:(on_init: %t,cron: `%s`, interval: `%s`, web_event: `%s`)", + "a single event must have one of (on-init: true,interval,cron,web-event,docker) field, received:(on_init: %t,cron: `%s`, interval: `%s`, web_event: `%s`, docker: %v)", s.OnInit, s.Cron, s.Interval, s.WebEvent, + s.Docker, ) log.WithError(err).Warn("Validation failed for JobEvent") return err @@ -113,3 +173,51 @@ func (s *JobEvent) Validate(log *logrus.Entry) error { log.Tracef("Validation successful for JobEvent: %+v", s) return nil } + +func dockerValidation(s *JobEvent, log *logrus.Entry) error { + // Check if regex matchers are valid + checkList := utils.NewList[string]() + checkList.Add( + s.Docker.Name, + s.Docker.Image, + ) + for _, v := range s.Docker.Labels { + checkList.Add(v) + } + err := utils.Fold(checkList, nil, func(initial error, item string) error { + if initial != nil { + return initial + } + _, err := regexp.Compile(s.Docker.Name) + return err + }) + if err != nil { + log.WithError(err).Warn("Validation failed for one of docker regex pattern (container name, image name, labels value)") + return err + } + for _, i := range s.Docker.Actions { + if !acceptedActions.Contains(events.Action(i)) { + err := fmt.Errorf("given action: %#v is not allowed", i) + log.WithError(err).Warn("Validation failed for one of docker actions") + return err + } + } + // Validating error handler parameters + if s.Docker.ErrorLimit > 0 { + log.Debug("error limit will be set to 1") + } + if s.Docker.ErrorLimitPolicy == "" { + log.Info("error limit set to non-zero number but no error policy was specified, using default policy (reconnect)") + } + if !utils.NewList("", event.GiveUp, event.Kill, event.Reconnect).Contains(s.Docker.ErrorLimitPolicy) { + err := fmt.Errorf("given error limit policy: %#v is not allowed, possible error policies are (give-up,kill,reconnect)", s.Docker.ErrorLimitPolicy) + log.WithError(err).Warn("Validation failed for docker error limit policy") + return err + } + if s.Docker.ErrorThrottle < 0 { + err := fmt.Errorf("received a negative throttle value: `%v`", s.Docker.ErrorThrottle) + log.WithError(err).Warn("Validation failed for docker, throttling value error") + return err + } + return nil +} diff --git a/core/event/docker.go b/core/event/docker.go new file mode 100644 index 0000000..af6bef9 --- /dev/null +++ b/core/event/docker.go @@ -0,0 +1,149 @@ +package event + +import ( + "context" + "regexp" + "time" + + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/client" + "github.com/sirupsen/logrus" + + "github.com/FMotalleb/crontab-go/core/concurrency" + "github.com/FMotalleb/crontab-go/core/utils" +) + +type ErrorLimitPolicy string + +const ( + Kill ErrorLimitPolicy = "kill" + GiveUp ErrorLimitPolicy = "give-up" + Reconnect ErrorLimitPolicy = "reconnect" +) + +type DockerEvent struct { + connection string + containerMatcher regexp.Regexp + imageMatcher regexp.Regexp + actions *utils.List[events.Action] + labels map[string]regexp.Regexp + errorThreshold uint + errorPolicy ErrorLimitPolicy + errorThrottle time.Duration + log *logrus.Entry +} + +func NewDockerEvent( + connection string, + containerMatcher string, + imageMatcher string, + actions []string, + labels map[string]string, + errorLimit uint, + errorPolicy ErrorLimitPolicy, + errorThrottle time.Duration, + logger *logrus.Entry, +) *DockerEvent { + return &DockerEvent{ + connection: connection, + containerMatcher: *regexp.MustCompile(containerMatcher), + imageMatcher: *regexp.MustCompile(imageMatcher), + actions: toAction(logger, actions), + labels: reshapeLabelMatcher(labels), + errorThreshold: errorLimit, + errorPolicy: errorPolicy, + errorThrottle: errorThrottle, + log: logger, + } +} + +// BuildTickChannel implements abstraction.Scheduler. +func (de *DockerEvent) BuildTickChannel() <-chan any { + notifyChan := make(chan any) + + cli, err := client.NewClientWithOpts( + client.WithHost(de.connection), + ) + if err != nil { + de.log.Warn("failed to connect to docker: ", err) + return de.BuildTickChannel() + } + go func() { + ctx := context.Background() + msg, err := cli.Events(ctx, events.ListOptions{}) + errCount := concurrency.NewLockedValue(uint(0)) + for { + select { + case err := <-err: + de.log.WithError(err).Warn("received an error from docker: ", err) + if de.errorThreshold == 0 { + continue + } + errs := errCount.Get() + 1 + errCount.Set(errs) + + if errs >= de.errorThreshold { + switch de.errorPolicy { + case GiveUp: + de.log.Warnf("Received more than %d consecutive errors from docker, marking instance as unstable and giving up this instance, no events will be received anymore", errs) + close(notifyChan) + return + case Kill: + de.log.Fatalf("Received more than %d consecutive errors from docker, marking instance as unstable and killing in return, this may happen due to dockerd restarting", errs) + case Reconnect: + de.log.Fatalf("Received more than %d consecutive errors from docker, marking instance as unstable and retry connecting to docker", errs) + default: + de.log.Fatalf("unexpected event.ErrorLimitPolicy: %#v, valid options are (kill,giv-up,reconnect)", de.errorPolicy) + } + } + if de.errorThrottle > 0 { + time.Sleep(de.errorThrottle) + } + case event := <-msg: + de.log.Trace("received an event from docker: ", event) + if de.matches(&event) { + notifyChan <- false + } + errCount.Set(0) + } + } + }() + + return notifyChan +} + +func (de *DockerEvent) matches(msg *events.Message) bool { + if de.actions.IsNotEmpty() && !de.actions.Contains(msg.Action) { + return false + } + if !de.containerMatcher.MatchString(msg.Actor.Attributes["name"]) { + return false + } + + if !de.imageMatcher.MatchString(msg.Actor.Attributes["image"]) { + return false + } + + for k, matcher := range de.labels { + if attrib, ok := msg.Actor.Attributes[k]; !ok && !matcher.MatchString(attrib) { + return false + } + } + return true +} + +func reshapeLabelMatcher(labels map[string]string) map[string]regexp.Regexp { + res := make(map[string]regexp.Regexp) + for k, v := range labels { + res[k] = *regexp.MustCompile(v) + } + return res +} + +func toAction(log *logrus.Entry, acts []string) *utils.List[events.Action] { + actions := utils.NewList[events.Action]() + for _, act := range acts { + actions.Add(events.Action(act)) + } + return actions +} diff --git a/core/goutils/goutils.go b/core/goutils/goutils.go deleted file mode 100644 index 1b9cce8..0000000 --- a/core/goutils/goutils.go +++ /dev/null @@ -1,14 +0,0 @@ -// Package goutils provides utility functions to handle goroutines and/or channels easily. -package goutils - -func ZipChannels[T interface{}](channels ...<-chan T) <-chan T { - output := make(chan T) - for _, ch := range channels { - go func() { - for i := range ch { - output <- i - } - }() - } - return output -} diff --git a/core/jobs/initializer.go b/core/jobs/initializer.go index 2bb8acf..271db91 100644 --- a/core/jobs/initializer.go +++ b/core/jobs/initializer.go @@ -8,7 +8,7 @@ import ( "github.com/FMotalleb/crontab-go/abstraction" "github.com/FMotalleb/crontab-go/config" cfgcompiler "github.com/FMotalleb/crontab-go/config/compiler" - "github.com/FMotalleb/crontab-go/core/goutils" + "github.com/FMotalleb/crontab-go/core/utils" ) func initEventSignal(events []abstraction.Event, logger *logrus.Entry) <-chan any { @@ -17,7 +17,7 @@ func initEventSignal(events []abstraction.Event, logger *logrus.Entry) <-chan an signals = append(signals, sh.BuildTickChannel()) } logger.Trace("Signals Built") - signal := goutils.ZipChannels(signals...) + signal := utils.ZipChannels(signals...) return signal } diff --git a/schema.json b/schema.json index 0d93057..0806d9a 100644 --- a/schema.json +++ b/schema.json @@ -121,6 +121,124 @@ "web-event": { "type": "string", "description": "Event name that received from webserver that should trigger this job." + }, + "docker": { + "type": "object", + "additionalProperties": false, + "properties": { + "connection": { + "type": "string", + "examples": [ + "unix:///var/run/docker.sock" + ], + "description": "Docker connection string (context), defaults to `unix:///var/run/docker.sock`." + }, + "name": { + "type": "string", + "examples": [ + "nginx", + ".*nginx.*", + "^project.*" + ], + "description": "Name matcher of the container, supports regex." + }, + "image": { + "type": "string", + "examples": [ + "redis:7.2.1-alpine", + "redis:.*" + ], + "description": "Name matcher of the image, supports regex." + }, + "actions": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "create", + "start", + "restart", + "stop", + "checkpoint", + "pause", + "unpause", + "attach", + "detach", + "resize", + "update", + "rename", + "kill", + "die", + "oom", + "destroy", + "remove", + "commit", + "top", + "copy", + "archive-path", + "extract-to-dir", + "export", + "import", + "save", + "load", + "tag", + "untag", + "push", + "pull", + "prune", + "delete", + "enable", + "disable", + "connect", + "disconnect", + "reload", + "mount", + "unmount", + "exec_create", + "exec_start", + "exec_die", + "exec_detach", + "health_status", + "health_status: running", + "health_status: healthy", + "health_status: unhealthy" + ] + }, + "description": "What happened to the container" + }, + "labels": { + "type": "object", + "description": "Labels of the affected container, supports `key: regex value`." + }, + "error-limit-count": { + "type": "integer", + "validate": { + "minimum": 1 + }, + "description": "How many consecutive errors must happen before enforcing the error-limit-policy" + }, + "error-limit-policy": { + "type": "integer", + "enum": [ + "kill", + "give-up", + "reconnect" + ], + "description": "What should happen after error limit is reached, (kill: kills the crontab-go, give-up: ignore error and disconnect from the instance and ignore this instance for the rest of the process, reconnect: reconnect: (default behavior) will attempt to reconnect to the instance)" + }, + "error-throttle": { + "type": "string", + "description": "Wait time after an error happens (events from that docker instance in this time will be ignored).", + "examples": [ + "1s", + "10m", + "1h", + "3.5h", + "5h30m15s" + ] + } + }, + "description": "Listen for docker events" } }, "required": [], @@ -163,7 +281,7 @@ "description": "Groupname that this command must run as. (root privilege needed)" }, "env": { - "$ref": "#/definitions/Env", + "$ref": "#/definitions/Map", "description": "An Env object that defines the environment variables for the task." }, "get": { @@ -178,7 +296,7 @@ "headers": { "type": "array", "items": { - "$ref": "#/definitions/Header" + "$ref": "#/definitions/Map" }, "description": "An array of Header objects that define the headers to be sent with the request." }, @@ -265,17 +383,11 @@ "required": [], "title": "Data" }, - "Env": { - "type": "object", - "additionalProperties": true, - "properties": {}, - "title": "Env" - }, - "Header": { + "Map": { "type": "object", "additionalProperties": true, "properties": {}, - "title": "Header" + "title": "Map" } } } \ No newline at end of file