Skip to content

Commit

Permalink
Merge pull request #37 from FMotalleb/feat/logfile-watcher
Browse files Browse the repository at this point in the history
Feat: logfile-watcher, event data to task
  • Loading branch information
FMotalleb authored Sep 1, 2024
2 parents 321be88 + 311997f commit 9b6f218
Show file tree
Hide file tree
Showing 28 changed files with 293 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ linters-settings:
linters:
disable-all: true
enable:
- bodyclose
# - bodyclose
- dogsled
- dupl
- errcheck
Expand Down
4 changes: 3 additions & 1 deletion abstraction/event.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Package abstraction must contain only interfaces and abstract layers of modules
package abstraction

type EventChannel = <-chan []string

// Event is an object that can be executed using a execute method and stopped using cancel method
type Event interface {
BuildTickChannel() <-chan any
BuildTickChannel() EventChannel
}
6 changes: 4 additions & 2 deletions config.local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
jobs:
- name: echo
tasks:
- command: env
- command: echo "$0 $@"
env:
"COLE": test
events:
- on-init: true
- log-file: /home/motalleb/Documents/GitHub/crontab-go/test
log-check-cycle: 1s
log-matcher: .*(?<id>\d{2}).*
26 changes: 20 additions & 6 deletions config/compiler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,35 @@ func CompileEvent(sh *config.JobEvent, cr *cron.Cron, logger *logrus.Entry) abst

case sh.Docker != nil:
d := sh.Docker
con := utils.MayFirstNonZero(d.Connection,
con := utils.FirstNonZeroForced(d.Connection,
"unix:///var/run/docker.sock",
)
event := event.NewDockerEvent(
e := event.NewDockerEvent(
con,
d.Name,
d.Image,
d.Actions,
d.Labels,
utils.MayFirstNonZero(d.ErrorLimit, 1),
utils.MayFirstNonZero(d.ErrorLimitPolicy, event.Reconnect),
utils.MayFirstNonZero(d.ErrorThrottle, time.Second*5),
utils.FirstNonZeroForced(d.ErrorLimit, 1),
utils.FirstNonZeroForced(d.ErrorLimitPolicy, event.Reconnect),
utils.FirstNonZeroForced(d.ErrorThrottle, time.Second*5),
logger,
)
return event
return e
case sh.LogFile != "":
e, err := event.NewLogFile(
sh.LogFile,
sh.LogLineBreaker,
sh.LogMatcher,
sh.LogCheckCycle,
logger,
)
if err != nil {
logger.Error("Error creating LogFile: ", err)
return nil
}
return e

}

return nil
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type JobEvent struct {
OnInit bool `mapstructure:"on-init" json:"on-init,omitempty"`
WebEvent string `mapstructure:"web-event" json:"web-event,omitempty"`
Docker *DockerEvent `mapstructure:"docker" json:"docker,omitempty"`

LogFile string `mapstructure:"log-file" json:"log-file,omitempty"`
LogCheckCycle time.Duration `mapstructure:"log-check-cycle" json:"log-check-cycle,omitempty"`
LogLineBreaker string `mapstructure:"log-line-breaker" json:"log-line-breaker,omitempty"`
LogMatcher string `mapstructure:"log-matcher" json:"log-matcher,omitempty"`
}

// DockerEvent represents a Docker event configuration
Expand Down
4 changes: 3 additions & 1 deletion config/job_valdiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *JobEvent) Validate(log *logrus.Entry) error {
s.Cron != "",
s.WebEvent != "",
s.Docker != nil,
s.LogFile != "",
s.OnInit,
)
activeEvents := utils.Fold(events, 0, func(c int, item bool) int {
Expand All @@ -158,12 +159,13 @@ func (s *JobEvent) Validate(log *logrus.Entry) error {

if activeEvents != 1 {
err := fmt.Errorf(
"a single event must have one of (on-init: true,interval,cron,web-event,docker) field, received:(on_init: %t,cron: `%s`, interval: `%s`, web_event: `%s`, docker: %v)",
"a single event must have one of (on-init: true,interval,cron,web-event,docker,log-file) field, received:(on_init: %t,cron: `%s`, interval: `%s`, web_event: `%s`, docker: %v,log-file: %v)",
s.OnInit,
s.Cron,
s.Interval,
s.WebEvent,
s.Docker,
s.LogFile,
)
log.WithError(err).Warn("Validation failed for JobEvent")
return err
Expand Down
10 changes: 9 additions & 1 deletion core/cmd_connection/docker_attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/FMotalleb/crontab-go/abstraction"
"github.com/FMotalleb/crontab-go/config"
"github.com/FMotalleb/crontab-go/ctxutils"
)

type DockerAttachConnection struct {
Expand Down Expand Up @@ -57,9 +58,16 @@ func (d *DockerAttachConnection) Prepare(ctx context.Context, task *config.Task)
d.log.Debug("No explicit docker connection specified, using default: `unix:///var/run/docker.sock`")
d.conn.DockerConnection = "unix:///var/run/docker.sock"
}
params := ctx.Value(ctxutils.EventData).([]string)
cmd := append(
[]string{shell},
append(shellArgs, task.Command)...,
append(
shellArgs,
append(
[]string{task.Command},
params...,
)...,
)...,
)
// Create an exec configuration
d.execCFG = &container.ExecOptions{
Expand Down
15 changes: 12 additions & 3 deletions core/cmd_connection/docker_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/FMotalleb/crontab-go/abstraction"
"github.com/FMotalleb/crontab-go/config"
"github.com/FMotalleb/crontab-go/ctxutils"
"github.com/FMotalleb/crontab-go/helpers"
)

Expand Down Expand Up @@ -58,9 +59,17 @@ func (d *DockerCreateConnection) Prepare(ctx context.Context, task *config.Task)
d.log.Debug("No explicit docker connection specified, using default: `unix:///var/run/docker.sock`")
d.conn.DockerConnection = "unix:///var/run/docker.sock"
}

params := ctx.Value(ctxutils.EventData).([]string)
cmd := append(
[]string{shell},
append(shellArgs, task.Command)...,
append(
shellArgs,
append(
[]string{task.Command},
params...,
)...,
)...,
)
volumes := make(map[string]struct{})
for _, volume := range d.conn.Volumes {
Expand Down Expand Up @@ -128,7 +137,7 @@ func (d *DockerCreateConnection) Execute() ([]byte, error) {
if err != nil {
return nil, err
}
defer helpers.WarnOnErr(
defer helpers.WarnOnErrIgnored(
d.log,
func() error {
return d.cli.ContainerRemove(ctx, exec.ID,
Expand Down Expand Up @@ -181,7 +190,7 @@ func (d *DockerCreateConnection) Execute() ([]byte, error) {
if err != nil {
return nil, err
}
defer helpers.WarnOnErr(
defer helpers.WarnOnErrIgnored(
d.log,
func() error {
return resp.Close()
Expand Down
10 changes: 9 additions & 1 deletion core/cmd_connection/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/FMotalleb/crontab-go/abstraction"
"github.com/FMotalleb/crontab-go/config"
credential "github.com/FMotalleb/crontab-go/core/os_credential"
"github.com/FMotalleb/crontab-go/ctxutils"
)

// Local represents a local command connection.
Expand Down Expand Up @@ -43,10 +44,17 @@ func (l *Local) Prepare(ctx context.Context, task *config.Task) error {
return fmt.Errorf("cannot get current working directory: %s", e)
}
}
params := ctx.Value(ctxutils.EventData).([]string)
l.cmd = exec.CommandContext(
ctx,
shell,
append(shellArgs, task.Command)...,
append(
shellArgs,
append(
[]string{task.Command},
params...,
)...,
)...,
)
l.log = l.log.WithFields(
logrus.Fields{
Expand Down
28 changes: 20 additions & 8 deletions core/event/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type Cron struct {
cronSchedule string
logger *logrus.Entry
cron *cron.Cron
notifyChan chan any
entry *cron.EntryID
}

Expand All @@ -31,22 +30,35 @@ func NewCron(schedule string, c *cron.Cron, logger *logrus.Entry) Cron {
}

// BuildTickChannel implements abstraction.Scheduler.
func (c *Cron) BuildTickChannel() <-chan any {
func (c *Cron) BuildTickChannel() <-chan []string {
if c.entry != nil {
c.logger.Fatal("already built the ticker channel")
}
c.notifyChan = make(chan any)
notifyChan := make(chan []string)
schedule, err := CronParser.Parse(c.cronSchedule)
if err != nil {
c.logger.Warnln("cannot initialize cron: ", err)
} else {
entry := c.cron.Schedule(schedule, c)
entry := c.cron.Schedule(
schedule,
&cronJob{
logger: c.logger,
scheduler: c.cronSchedule,
notify: notifyChan,
},
)
c.entry = &entry
}
return c.notifyChan
return notifyChan
}

func (c *Cron) Run() {
c.logger.Debugln("cron tick received")
c.notifyChan <- false
type cronJob struct {
logger *logrus.Entry
scheduler string
notify chan<- []string
}

func (j *cronJob) Run() {
j.logger.Debugln("cron tick received")
j.notify <- []string{"cron", j.scheduler}
}
20 changes: 15 additions & 5 deletions core/event/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package event

import (
"context"
"fmt"
"regexp"
"strings"
"time"

"github.com/docker/docker/api/types/events"
Expand Down Expand Up @@ -58,8 +60,8 @@ func NewDockerEvent(
}

// BuildTickChannel implements abstraction.Scheduler.
func (de *DockerEvent) BuildTickChannel() <-chan any {
notifyChan := make(chan any)
func (de *DockerEvent) BuildTickChannel() <-chan []string {
notifyChan := make(chan []string)

cli, err := client.NewClientWithOpts(
client.WithHost(de.connection),
Expand Down Expand Up @@ -92,8 +94,8 @@ func (de *DockerEvent) BuildTickChannel() <-chan any {
de.log.Fatalf("Received more than %d consecutive errors from docker, marking instance as unstable and killing in return, this may happen due to dockerd restarting", errs)
case Reconnect:
de.log.Warnf("Received more than %d consecutive errors from docker, marking instance as unstable and retry connecting to docker", errs)
for range de.BuildTickChannel() {
notifyChan <- nil
for e := range de.BuildTickChannel() {
notifyChan <- e
}
default:
de.log.Fatalf("unexpected event.ErrorLimitPolicy: %#v, valid options are (kill,giv-up,reconnect)", de.errorPolicy)
Expand All @@ -106,7 +108,7 @@ func (de *DockerEvent) BuildTickChannel() <-chan any {
case event := <-msg:
de.log.Trace("received an event from docker: ", event)
if de.matches(&event) {
notifyChan <- nil
notifyChan <- []string{"docker", event.Scope, string(event.Action), event.Actor.ID, strings.Join(reshapeAttrib(event.Actor.Attributes), ",")}
}
errCount.Set(0)
}
Expand Down Expand Up @@ -144,6 +146,14 @@ func reshapeLabelMatcher(labels map[string]string) map[string]regexp.Regexp {
return res
}

func reshapeAttrib(input map[string]string) []string {
res := make([]string, 0, len(input))
for k, v := range input {
res = append(res, fmt.Sprintf("%s=%s", k, v))
}
return res
}

func toAction(acts []string) *utils.List[events.Action] {
actions := utils.NewList[events.Action]()
for _, act := range acts {
Expand Down
6 changes: 3 additions & 3 deletions core/event/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package event
type Init struct{}

// BuildTickChannel implements abstraction.Scheduler.
func (c *Init) BuildTickChannel() <-chan any {
notifyChan := make(chan any)
func (c *Init) BuildTickChannel() <-chan []string {
notifyChan := make(chan []string)

go func() {
notifyChan <- false
notifyChan <- []string{"init"}
close(notifyChan)
}()

Expand Down
18 changes: 9 additions & 9 deletions core/event/interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
)

type Interval struct {
duration time.Duration
logger *logrus.Entry
ticker *time.Ticker
notifyChan chan any
duration time.Duration
logger *logrus.Entry
ticker *time.Ticker
}

func NewInterval(schedule time.Duration, logger *logrus.Entry) Interval {
Expand All @@ -27,18 +26,19 @@ func NewInterval(schedule time.Duration, logger *logrus.Entry) Interval {
}

// BuildTickChannel implements abstraction.Scheduler.
func (c *Interval) BuildTickChannel() <-chan any {
func (c *Interval) BuildTickChannel() <-chan []string {
if c.ticker != nil {
c.logger.Fatal("already built the ticker channel")
}
c.notifyChan = make(chan any)
notifyChan := make(chan []string)
c.ticker = time.NewTicker(c.duration)
go func() {
// c.notifyChan <- false
for range c.ticker.C {
c.notifyChan <- false

for i := range c.ticker.C {
notifyChan <- []string{"interval", c.duration.String(), i.Format(time.RFC3339)}
}
}()

return c.notifyChan
return notifyChan
}
Loading

0 comments on commit 9b6f218

Please sign in to comment.