Skip to content

Commit

Permalink
setup ConsulConfig and ConsulExtras (#252)
Browse files Browse the repository at this point in the history
* setup ConsulConfig and ConsulExtras
* Add extra consul documentation
  • Loading branch information
flyinprogrammer authored and tgross committed Dec 8, 2016
1 parent 9397588 commit 348b254
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 30 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func createTelemetryService(t *telemetry.Telemetry, discoveryService discovery.S
t.TTL,
t.Interfaces,
t.Tags,
nil,
discoveryService)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion core/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *NoopServiceBackend) Deregister(service *discovery.ServiceDefinition)

func getSignalTestConfig() *App {
service, _ := services.NewService(
"test-service", 1, 1, 1, nil, nil, &NoopServiceBackend{})
"test-service", 1, 1, 1, nil, nil, nil, &NoopServiceBackend{})
app := EmptyApp()
cmd, _ := commands.NewCommand([]string{
"./testdata/test.sh",
Expand Down
20 changes: 15 additions & 5 deletions discovery/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,27 @@ func (c *Consul) SendHeartbeat(service *discovery.ServiceDefinition) {
}

func (c *Consul) registerService(service discovery.ServiceDefinition) error {
var enableTagOverride bool
if service.ConsulExtras != nil {
enableTagOverride = service.ConsulExtras.EnableTagOverride
}
return c.Agent().ServiceRegister(
&consul.AgentServiceRegistration{
ID: service.ID,
Name: service.Name,
Tags: service.Tags,
Port: service.Port,
Address: service.IPAddress,
ID: service.ID,
Name: service.Name,
Tags: service.Tags,
Port: service.Port,
Address: service.IPAddress,
EnableTagOverride: enableTagOverride,
},
)
}

func (c *Consul) registerCheck(service discovery.ServiceDefinition) error {
var deregisterCriticalServiceAfter string
if service.ConsulExtras != nil {
deregisterCriticalServiceAfter = service.ConsulExtras.DeregisterCriticalServiceAfter
}
return c.Agent().CheckRegister(
&consul.AgentCheckRegistration{
ID: service.ID,
Expand All @@ -145,6 +154,7 @@ func (c *Consul) registerCheck(service discovery.ServiceDefinition) error {
ServiceID: service.ID,
AgentServiceCheck: consul.AgentServiceCheck{
TTL: fmt.Sprintf("%ds", service.TTL),
DeregisterCriticalServiceAfter: deregisterCriticalServiceAfter,
},
},
)
Expand Down
56 changes: 56 additions & 0 deletions discovery/consul/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@ func setupConsul(serviceName string) (*Consul, *discovery.ServiceDefinition) {
return consul, service
}

func setupWaitForLeader(consul *Consul) error {
maxRetry := 30
retry := 0
var err error

// we need to wait for Consul to start and self-elect
for ; retry < maxRetry; retry++ {
if retry > 0 {
time.Sleep(1 * time.Second)
}
if leader, err := consul.Status().Leader(); err == nil && leader != "" {
break
}
}
return err
}

func TestConsulObjectParse(t *testing.T) {
rawCfg := map[string]interface{}{
"address": "consul:8501",
Expand Down Expand Up @@ -57,6 +74,9 @@ func runParseTest(t *testing.T, uri, expectedAddress, expectedScheme string) {

func TestConsulTTLPass(t *testing.T) {
consul, service := setupConsul("service-TestConsulTTLPass")
if err := setupWaitForLeader(consul); err != nil {
t.Errorf("Consul leader could not be elected.")
}
id := service.ID

consul.SendHeartbeat(service) // force registration and 1st heartbeat
Expand All @@ -70,6 +90,9 @@ func TestConsulTTLPass(t *testing.T) {
func TestConsulCheckForChanges(t *testing.T) {
backend := "service-TestConsulCheckForChanges"
consul, service := setupConsul(backend)
if err := setupWaitForLeader(consul); err != nil {
t.Errorf("Consul leader could not be elected.")
}
id := service.ID
if consul.CheckForUpstreamChanges(backend, "") {
t.Fatalf("First read of %s should show `false` for change", id)
Expand All @@ -88,3 +111,36 @@ func TestConsulCheckForChanges(t *testing.T) {
t.Errorf("%v should have changed after TTL expired.", id)
}
}

func TestConsulEnableTagOverride(t *testing.T) {
backend := "service-TestConsulEnableTagOverride"
consul, _ := NewConsulConfig("consul:8500")
if err := setupWaitForLeader(consul); err != nil {
t.Errorf("Consul leader could not be elected.")
}
service := &discovery.ServiceDefinition{
ID: backend,
Name: backend,
IPAddress: "192.168.1.1",
TTL: 1,
Port: 9000,
ConsulExtras: &discovery.ConsulExtras{
EnableTagOverride: true,
},
}
id := service.ID
if consul.CheckForUpstreamChanges(backend, "") {
t.Fatalf("First read of %s should show `false` for change", id)
}
consul.SendHeartbeat(service) // force registration
catalogService, _, err := consul.Catalog().Service(id, "", nil)
if err != nil {
t.Fatalf("Error finding service: %v", err)
}

for _, service := range catalogService {
if service.ServiceEnableTagOverride != true {
t.Errorf("%v should have had EnableTagOverride set to true", id)
}
}
}
19 changes: 13 additions & 6 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ type ServiceBackend interface {
// ServiceDefinition is the concrete service structure that is
// registered with the service discovery backend.
type ServiceDefinition struct {
ID string
Name string
Port int
TTL int
Tags []string
IPAddress string
ID string
Name string
Port int
TTL int
Tags []string
IPAddress string
ConsulExtras *ConsulExtras
}

// ConsulExtras handles additional Consul configuration.
type ConsulExtras struct {
EnableTagOverride bool `mapstructure:"enableTagOverride"`
DeregisterCriticalServiceAfter string `mapstructure:"deregisterCriticalServiceAfter"`
}

// ServiceDiscoveryConfigHook parses a raw service discovery config
Expand Down
10 changes: 8 additions & 2 deletions documentation/12-configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ The format of the JSON file configuration is as follows:
"poll": 10,
"ttl": 30,
"timeout": "10s",
"tags": ["tag1"]
"tags": ["tag1"],
"consul": {
"enableTagOverride": true,
"deregisterCriticalServiceAfter": "90m"
}
}
],
"backends": [
Expand Down Expand Up @@ -124,7 +128,9 @@ The format of the JSON file configuration is as follows:
- `ttl` is the time-to-live of a successful health check. This should be longer than the polling rate so that the polling process and the TTL aren't racing; otherwise Consul will mark the service as unhealthy.
- `tags` is an optional array of tags. If the discovery service supports it (Consul does), the service will register itself with these tags.
- `timeout` an optional value to wait before forcibly killing the health check. Health checks killed in this way are terminated immediately (`SIGKILL`) without an opportunity to clean up their state. This means that a heartbeat will not be sent. The minimum timeout is `1ms`. Omitting this field means that ContainerPilot will wait indefinitely for the health check. *Deprecation warning:* in ContainerPilot 3.0 this will default to the `poll` time.

- `consul` an optional block of consul specific service configuration.
- [`enableTagOverride`](https://www.consul.io/docs/agent/services.html) if set to `true`, then external agents can update this service in the catalog and modify the tags.
- [`deregisterCriticalServiceAfter`](https://www.consul.io/docs/agent/http/agent.html) is a timeout in Go time format. If a check is in the critical state for more than this configured value, then its associated service (and all of its associated checks) will automatically be deregistered.

### `backends`

Expand Down
3 changes: 0 additions & 3 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ LDFLAGS := -X ${IMPORT_PATH}/core.GitHash='$(shell git rev-parse --short HEAD)'

ROOT := $(shell pwd)

COMPOSE_PREFIX_ETCD := exetcd
COMPOSE_PREFIX_CONSUL := exconsul

DOCKERRUN := docker run --rm \
--link containerpilot_consul:consul \
--link containerpilot_etcd:etcd \
Expand Down
50 changes: 37 additions & 13 deletions services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ type Service struct {
Tags []string `mapstructure:"tags"`
Timeout string `mapstructure:"timeout"`
IPAddress string
ConsulConfig *ConsulConfig `mapstructure:"consul"`
healthCheckCmd *commands.Command
discoveryService discovery.ServiceBackend
definition *discovery.ServiceDefinition
}

// ConsulConfig handles additional Consul configuration.
type ConsulConfig struct {
EnableTagOverride bool `mapstructure:"enableTagOverride"`
DeregisterCriticalServiceAfter string `mapstructure:"deregisterCriticalServiceAfter"`
}

// NewServices new services from a raw config
func NewServices(raw []interface{}, disc discovery.ServiceBackend) ([]*Service, error) {
if raw == nil {
Expand All @@ -47,14 +54,15 @@ func NewServices(raw []interface{}, disc discovery.ServiceBackend) ([]*Service,

// NewService creates a new service
func NewService(name string, poll, port, ttl int, interfaces interface{},
tags []string, disc discovery.ServiceBackend) (*Service, error) {
tags []string, consulConfig *ConsulConfig, disc discovery.ServiceBackend) (*Service, error) {
service := &Service{
Name: name,
Poll: poll,
Port: port,
TTL: ttl,
Interfaces: interfaces,
Tags: tags,
Name: name,
Poll: poll,
Port: port,
TTL: ttl,
Interfaces: interfaces,
Tags: tags,
ConsulConfig: consulConfig,
}
if err := parseService(service, disc); err != nil {
return nil, err
Expand Down Expand Up @@ -101,13 +109,29 @@ func parseService(s *Service, disc discovery.ServiceBackend) error {
}
s.IPAddress = ipAddress

var consulExtras *discovery.ConsulExtras
if s.ConsulConfig != nil {

if s.ConsulConfig.DeregisterCriticalServiceAfter != "" {
if _, err := time.ParseDuration(s.ConsulConfig.DeregisterCriticalServiceAfter); err != nil {
return fmt.Errorf("Could not parse consul `deregisterCriticalServiceAfter` in service %s: %s", s.Name, err)
}
}

consulExtras = &discovery.ConsulExtras{
DeregisterCriticalServiceAfter: s.ConsulConfig.DeregisterCriticalServiceAfter,
EnableTagOverride: s.ConsulConfig.EnableTagOverride,
}
}

s.definition = &discovery.ServiceDefinition{
ID: s.ID,
Name: s.Name,
Port: s.Port,
TTL: s.TTL,
Tags: s.Tags,
IPAddress: s.IPAddress,
ID: s.ID,
Name: s.Name,
Port: s.Port,
TTL: s.TTL,
Tags: s.Tags,
IPAddress: s.IPAddress,
ConsulExtras: consulExtras,
}
return nil
}
Expand Down
96 changes: 96 additions & 0 deletions services/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,102 @@ func TestServicesConfigError(t *testing.T) {
"Could not parse `health` in service myName: time: invalid duration xx")
}

func TestServicesConsulConfigEnableTagOverride(t *testing.T) {
jsonFragment := []byte(`[
{
"name": "serviceA",
"port": 8080,
"interfaces": "eth0",
"health": ["/bin/to/healthcheck/for/service/A.sh", "A1", "A2"],
"poll": 30,
"ttl": 19,
"timeout": "1ms",
"tags": ["tag1","tag2"],
"consul": {
"enableTagOverride": true
}
}
]`)

if services, err := NewServices(decodeJSONRawService(t, jsonFragment), nil); err != nil {
t.Fatalf("Could not parse service JSON: %s", err)
} else {
if services[0].ConsulConfig.EnableTagOverride != true {
t.Errorf("ConsulConfig should have had EnableTagOverride set to true.")
}
}
}

func TestInvalidServicesConsulConfigEnableTagOverride(t *testing.T) {
jsonFragment := []byte(`[
{
"name": "serviceA",
"port": 8080,
"interfaces": "eth0",
"health": ["/bin/to/healthcheck/for/service/A.sh", "A1", "A2"],
"poll": 30,
"ttl": 19,
"timeout": "1ms",
"tags": ["tag1","tag2"],
"consul": {
"enableTagOverride": "nope"
}
}
]`)

if _, err := NewServices(decodeJSONRawService(t, jsonFragment), nil); err == nil {
t.Errorf("ConsulConfig should have thrown error about EnableTagOverride being a string.")
}
}

func TestServicesConsulConfigDeregisterCriticalServiceAfter(t *testing.T) {
jsonFragment := []byte(`[
{
"name": "serviceA",
"port": 8080,
"interfaces": "eth0",
"health": ["/bin/to/healthcheck/for/service/A.sh", "A1", "A2"],
"poll": 30,
"ttl": 19,
"timeout": "1ms",
"tags": ["tag1","tag2"],
"consul": {
"deregisterCriticalServiceAfter": "40m"
}
}
]`)

if services, err := NewServices(decodeJSONRawService(t, jsonFragment), nil); err != nil {
t.Fatalf("Could not parse service JSON: %s", err)
} else {
if services[0].ConsulConfig.DeregisterCriticalServiceAfter != "40m" {
t.Errorf("ConsulConfig should have had DeregisterCriticalServiceAfter set to '40m'.")
}
}
}

func TestInvalidServicesConsulConfigDeregisterCriticalServiceAfter(t *testing.T) {
jsonFragment := []byte(`[
{
"name": "serviceA",
"port": 8080,
"interfaces": "eth0",
"health": ["/bin/to/healthcheck/for/service/A.sh", "A1", "A2"],
"poll": 30,
"ttl": 19,
"timeout": "1ms",
"tags": ["tag1","tag2"],
"consul": {
"deregisterCriticalServiceAfter": "nope"
}
}
]`)

if _, err := NewServices(decodeJSONRawService(t, jsonFragment), nil); err == nil {
t.Errorf("Error should have been generated for duration 'nope'.")
}
}

// ------------------------------------------
// test helpers

Expand Down

0 comments on commit 348b254

Please sign in to comment.