Skip to content

Commit

Permalink
provide 'when.once' and 'when.each' instead of 'when.event' (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross authored Apr 21, 2017
1 parent ac3cc21 commit e342282
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 52 deletions.
10 changes: 5 additions & 5 deletions config/testdata/test.json5
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
exec: "/bin/serviceA",
when: {
source: "preStart",
event: "exitSuccess"
once: "exitSuccess"
},
health: {
exec: "/bin/to/healthcheck/for/service/A.sh",
Expand Down Expand Up @@ -53,31 +53,31 @@
exec: ["/bin/to/preStop.sh","arg1","arg2"],
when: {
source: "serviceA",
event: "stopping"
once: "stopping"
}
},
{
name: "postStop",
exec: ["/bin/to/postStop.sh"],
when: {
source: "serviceA",
event: "stopped"
once: "stopped"
}
},
{
name: "onChange-upstreamA",
exec: ["/bin/onChangeA.sh"],
when: {
source: "watch.upstreamA",
event: "changed"
each: "changed"
}
},
{
name: "onChange-upstreamB",
exec: ["/bin/onChangeB.sh"],
when: {
source: "watch.upstreamB",
event: "healthy"
each: "healthy"
}
}
],
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/fixtures/app/containerpilot.json5
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
port: 8000,
when: {
source: "preStart",
event: "exitSuccess"
once: "exitSuccess"
},
exec: [
"/usr/local/bin/node",
Expand All @@ -30,15 +30,15 @@
name: "reload-for-nginx",
when: {
source: "watch.nginx",
event: "changed"
each: "changed"
},
exec: "/reload-app.sh"
},
{
name: "reload-for-app",
when: {
source: "watch.app",
event: "changed"
each: "changed"
},
exec: "/reload-app.sh"
}
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/fixtures/nginx/etc/nginx-with-consul.json5
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
interfaces: ["eth0"],
when: {
source: "preStart",
event: "exitSuccess",
once: "exitSuccess",
},
exec: "nginx",
health: {
Expand All @@ -30,7 +30,7 @@
name: "onChange",
when: {
source: "watch.app",
event: "changed"
each: "changed"
},
exec: [
"consul-template", "-once", "-consul", "consul:8500", "-template",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
port: 8000,
when: {
source: "preStart",
event: "exitSuccess"
once: "exitSuccess"
},
exec: [
"/usr/local/bin/node",
Expand All @@ -30,15 +30,15 @@
name: "reload-for-nginx",
when: {
source: "watch.nginx",
event: "changed"
each: "changed"
},
exec: "/reload-app.sh"
},
{
name: "reload-for-app",
when: {
source: "watch.app",
event: "changed"
each: "changed"
},
exec: "/reload-app.sh"
}
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/tests/test_tasks/containerpilot.json5
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
port: 8000,
when: {
source: "preStart",
event: "exitSuccess"
once: "exitSuccess"
},
exec: [
"/usr/local/bin/node",
Expand Down
25 changes: 20 additions & 5 deletions jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
When *WhenConfig `mapstructure:"when"`
whenEvent events.Event
whenTimeout time.Duration
whenStartsLimit int
stoppingWaitEvent events.Event
}

Expand All @@ -56,7 +57,8 @@ type Config struct {
type WhenConfig struct {
Frequency string `mapstructure:"interval"`
Source string `mapstructure:"source"`
Event string `mapstructure:"event"`
Once string `mapstructure:"once"`
Each string `mapstructure:"each"`
Timeout string `mapstructure:"timeout"`
}

Expand Down Expand Up @@ -150,10 +152,14 @@ func (cfg *Config) validateWhen() error {
cfg.When = &WhenConfig{} // give us a safe zero-value
cfg.whenTimeout = time.Duration(0)
cfg.whenEvent = events.GlobalStartup
cfg.whenStartsLimit = 1
return nil
}
if cfg.When.Frequency != "" && cfg.When.Event != "" {
return fmt.Errorf("job[%s].when can have an 'interval' or an 'event' but not both",

if (cfg.When.Frequency != "" && cfg.When.Once != "") ||
(cfg.When.Frequency != "" && cfg.When.Each != "") ||
(cfg.When.Once != "" && cfg.When.Each != "") {
return fmt.Errorf("job[%s].when can have only one of 'interval', 'once', or 'each'",
cfg.Name)
}
if cfg.When.Frequency != "" {
Expand All @@ -175,6 +181,7 @@ func (cfg *Config) validateFrequency() error {
cfg.freqInterval = freq
cfg.whenTimeout = time.Duration(0)
cfg.whenEvent = events.GlobalStartup
cfg.whenStartsLimit = 1
return nil
}

Expand All @@ -186,7 +193,15 @@ func (cfg *Config) validateWhenEvent() error {
cfg.Name, err)
}
cfg.whenTimeout = whenTimeout
eventCode, err := events.FromString(cfg.When.Event)

var eventCode events.EventCode
if cfg.When.Once != "" {
eventCode, err = events.FromString(cfg.When.Once)
cfg.whenStartsLimit = 1
} else {
eventCode, err = events.FromString(cfg.When.Each)
cfg.whenStartsLimit = unlimited
}
if err != nil {
return fmt.Errorf("unable to parse job[%s].when.event: %v",
cfg.Name, err)
Expand Down Expand Up @@ -298,7 +313,7 @@ func (cfg *Config) validateRestarts() error {
switch t := cfg.Restarts.(type) {
case string:
if t == "unlimited" {
cfg.restartLimit = unlimitedRestarts
cfg.restartLimit = unlimited
} else if t == "never" {
cfg.restartLimit = 0
} else if i, err := strconv.Atoi(t); err == nil && i >= 0 {
Expand Down
4 changes: 2 additions & 2 deletions jobs/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func TestJobConfigSmokeTest(t *testing.T) {
job5 := jobs[5]
assert.Equal(t, job5.Name, "preStop", "expected '%v' for job5.Name but got '%v'")
assert.Equal(t, job5.Port, 0, "expected '%v' for job5.Port but got '%v'")
assert.Equal(t, job5.When, &WhenConfig{Source: "serviceA", Event: "stopping"},
assert.Equal(t, job5.When, &WhenConfig{Source: "serviceA", Once: "stopping"},
"expected '%v' for job5.When but got '%v'")
assert.Equal(t, job5.Restarts, nil, "expected '%v' for job5.Restarts but got '%v'")

job6 := jobs[6]
assert.Equal(t, job6.Name, "postStop", "expected '%v' for job6.Name but got '%v'")
assert.Equal(t, job6.Port, 0, "expected '%v' for job6.Port but got '%v'")
assert.Equal(t, job6.When, &WhenConfig{Source: "serviceA", Event: "stopped"},
assert.Equal(t, job6.When, &WhenConfig{Source: "serviceA", Once: "stopped"},
"expected '%v' for job6.When but got '%v'")
assert.Equal(t, job6.Restarts, nil, "expected '%v' for job6.Restarts but got '%v'")
}
Expand Down
35 changes: 22 additions & 13 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (

// Some magic numbers used internally by restart limits
const (
unlimitedRestarts = -1
eventBufferSize = 1000
unlimited = -1
eventBufferSize = 1000
)

// Job manages the state of a job and its start/stop conditions
Expand All @@ -27,9 +27,12 @@ type Job struct {
healthCheckExec *commands.Command
healthCheckName string

// related events
whenEvent events.Event
whenTimeout time.Duration
// starting events
startEvent events.Event
startTimeout time.Duration
startsRemain int

// stopping events
stoppingWaitEvent events.Event
stoppingTimeout time.Duration

Expand All @@ -51,8 +54,9 @@ func NewJob(cfg *Config) *Job {
discoveryCatalog: cfg.discoveryCatalog,
Service: cfg.definition,
healthCheckExec: cfg.healthCheckExec,
whenEvent: cfg.whenEvent,
whenTimeout: cfg.whenTimeout,
startEvent: cfg.whenEvent,
startTimeout: cfg.whenTimeout,
startsRemain: cfg.whenStartsLimit,
stoppingWaitEvent: cfg.stoppingWaitEvent,
stoppingTimeout: cfg.stoppingTimeout,
restartLimit: cfg.restartLimit,
Expand Down Expand Up @@ -144,8 +148,8 @@ func (job *Job) Run(bus *events.EventBus) {
}

startTimeoutSource := fmt.Sprintf("%s.wait-timeout", job.Name)
if job.whenTimeout > 0 {
events.NewEventTimeout(ctx, job.Rx, job.whenTimeout, startTimeoutSource)
if job.startTimeout > 0 {
events.NewEventTimeout(ctx, job.Rx, job.startTimeout, startTimeoutSource)
}

var healthCheckName string
Expand Down Expand Up @@ -177,7 +181,7 @@ func (job *Job) Run(bus *events.EventBus) {
break loop
}
job.restartsRemain--
job.Rx <- job.whenEvent
job.StartJob(ctx)
case events.Event{events.ExitFailed, healthCheckName}:
job.Status = false
job.Bus.Publish(events.Event{events.StatusUnhealthy, job.Name})
Expand All @@ -203,17 +207,22 @@ func (job *Job) Run(bus *events.EventBus) {
break loop
}
job.restartsRemain--
job.Rx <- job.whenEvent
case job.whenEvent:
job.StartJob(ctx)
case job.startEvent:
if job.startsRemain == unlimited || job.startsRemain > 0 {
job.startsRemain--
job.StartJob(ctx)
} else {
break loop
}
}
}
job.cleanup(ctx, cancel)
}()
}

func (job *Job) restartPermitted() bool {
if job.restartLimit == unlimitedRestarts || job.restartsRemain > 0 {
if job.restartLimit == unlimited || job.restartsRemain > 0 {
return true
}
return false
Expand Down
19 changes: 10 additions & 9 deletions jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestJobRunStartupTimeout(t *testing.T) {
ds.Run(time.Duration(1 * time.Second)) // need to leave room to wait for timeouts

cfg := &Config{Name: "myjob", Exec: "true",
When: &WhenConfig{Source: "never", Event: "startup", Timeout: "100ms"}}
When: &WhenConfig{Source: "never", Once: "startup", Timeout: "100ms"}}
cfg.Validate(noop)
job := NewJob(cfg)
job.Run(bus)
Expand Down Expand Up @@ -85,13 +85,15 @@ func TestJobRunRestarts(t *testing.T) {
ds.Run(time.Duration(100 * time.Millisecond))

cfg := &Config{
Name: "myjob",
whenEvent: events.GlobalStartup,
Exec: []string{"./testdata/test.sh", "doStuff", "runRestartsTest"},
Restarts: restarts,
Name: "myjob",
whenEvent: events.GlobalStartup,
whenStartsLimit: 1,
Exec: []string{"./testdata/test.sh", "doStuff", "runRestartsTest"},
Restarts: restarts,
}
cfg.Validate(noop)
job := NewJob(cfg)

job.Run(bus)
job.Bus.Publish(events.GlobalStartup)
exitOk := events.Event{Code: events.ExitSuccess, Source: "myjob"}
Expand All @@ -118,10 +120,9 @@ func TestJobRunPeriodic(t *testing.T) {
ds := mocks.NewDebugSubscriber(bus, 10)

cfg := &Config{
Name: "myjob",
whenEvent: events.GlobalStartup,
Exec: []string{"./testdata/test.sh", "doStuff", "runPeriodicTest"},
When: &WhenConfig{Frequency: "10ms"},
Name: "myjob",
Exec: []string{"./testdata/test.sh", "doStuff", "runPeriodicTest"},
When: &WhenConfig{Frequency: "10ms"},
// we need to make sure we don't have any events getting cut off
// by the test run of 100ms (which would result in flaky tests),
// so this should ensure we get a predictable number within the window
Expand Down
6 changes: 3 additions & 3 deletions jobs/testdata/TestJobConfigServiceWithPreStart.json5
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
exec: "/bin/serviceA.sh",
when: {
source: "preStart",
event: "exitSuccess",
once: "exitSuccess",
},
health: {
exec: "/bin/healthCheckA.sh A1 A2",
Expand All @@ -23,15 +23,15 @@
name: "preStop",
when: {
source: "serviceA",
event: "stopping"
once: "stopping"
},
exec: ["/bin/to/preStop.sh","arg1","arg2"]
},
{
name: "postStop",
when: {
source: "serviceA",
event: "stopped"
once: "stopped"
},
exec: ["/bin/to/postStop.sh"]
}
Expand Down
6 changes: 3 additions & 3 deletions jobs/testdata/TestJobConfigServiceWithStopping.json5
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
exec: "/bin/serviceA.sh",
when: {
source: "preStart",
event: "exitSuccess",
once: "exitSuccess",
},
health: {
interval: 1,
Expand All @@ -21,15 +21,15 @@
name: "preStop",
when: {
source: "serviceA",
event: "stopping"
once: "stopping"
},
exec: ["/bin/to/preStop.sh","arg1","arg2"]
},
{
name: "postStop",
when: {
source: "serviceA",
event: "stopped"
once: "stopped"
},
exec: ["/bin/to/postStop.sh"]
}
Expand Down
Loading

0 comments on commit e342282

Please sign in to comment.