Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-host compatible etcd service watcher #86

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM phusion/baseimage:0.9.10
ENV HOME /root
CMD ["/sbin/my_init"]
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

36 changes: 19 additions & 17 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PATH
synapse (0.11.1)
aws-sdk (~> 1.39)
docker-api (~> 1.7.2)
etcd (~> 0.2.3)
zk (~> 1.9.4)

GEM
Expand All @@ -21,6 +22,8 @@ GEM
json
excon (0.38.0)
ffi (1.9.3-java)
etcd (0.2.3)
mixlib-log
json (1.8.1)
json (1.8.1-java)
little-plugger (1.1.3)
Expand All @@ -33,27 +36,27 @@ GEM
nokogiri (1.6.2.1)
mini_portile (= 0.6.0)
nokogiri (1.6.2.1-java)
pry (0.9.12.2)
pry (0.9.12.6)
coderay (~> 1.0.5)
mixlib-log (1.6.0)
method_source (~> 0.8)
slop (~> 3.4)
pry (0.9.12.2-java)
coderay (~> 1.0.5)
method_source (~> 0.8)
slop (~> 3.4)
spoon (~> 0.0)
pry-nav (0.2.3)
pry (~> 0.9.10)
rake (10.1.1)
rspec (2.14.1)
rspec-core (~> 2.14.0)
rspec-expectations (~> 2.14.0)
rspec-mocks (~> 2.14.0)
rspec-core (2.14.5)
rspec-expectations (2.14.2)
diff-lcs (>= 1.1.3, < 2.0)
rspec-mocks (2.14.3)
slop (3.4.6)
rake (10.3.2)
rspec (3.0.0)
rspec-core (~> 3.0.0)
rspec-expectations (~> 3.0.0)
rspec-mocks (~> 3.0.0)
rspec-core (3.0.2)
rspec-support (~> 3.0.0)
rspec-expectations (3.0.2)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.0.0)
rspec-mocks (3.0.2)
rspec-support (~> 3.0.0)
rspec-support (3.0.2)
slop (3.5.0)
slyphon-log4j (1.2.15)
slyphon-zookeeper_jar (3.3.5-java)
spoon (0.0.4)
Expand All @@ -67,7 +70,6 @@ GEM
slyphon-zookeeper_jar (= 3.3.5)

PLATFORMS
java
ruby

DEPENDENCIES
Expand Down
3 changes: 3 additions & 0 deletions lib/synapse/service_watcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "synapse/service_watcher/dns"
require "synapse/service_watcher/docker"
require "synapse/service_watcher/zookeeper_dns"
require "synapse/service_watcher/etcd"

module Synapse
class ServiceWatcher
Expand All @@ -15,6 +16,7 @@ class ServiceWatcher
'dns' => DnsWatcher,
'docker' => DockerWatcher,
'zookeeper_dns' => ZookeeperDnsWatcher,
'etcd' => EtcdWatcher
}

# the method which actually dispatches watcher creation requests
Expand All @@ -32,3 +34,4 @@ def self.create(name, opts, synapse)
end
end
end

187 changes: 187 additions & 0 deletions lib/synapse/service_watcher/etcd.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
require "synapse/service_watcher/base"

require 'etcd'

# Monkeypatch till 91f9e72d6d57ae3760e9266835f404d986072590 gets to rubygems..
module Etcd
module Keys
def watch(key, opts = {})
params = { wait: true }
fail ArgumentError, 'Second argument must be a hash' unless opts.is_a?(Hash)
timeout = opts[:timeout] || @read_timeout
index = opts[:waitIndex] || opts[:index]
params[:waitIndex] = index unless index.nil?
params[:consistent] = opts[:consistent] if opts.key?(:consistent)
params[:recursive] = opts[:recursive] if opts.key?(:recursive)

response = api_execute(key_endpoint + key, :get,
timeout: timeout, params: params)
Response.from_http_response(response)
end
end
end

module Synapse
class EtcdWatcher < BaseWatcher
NUMBERS_RE = /^\d+$/

def start
@etcd_hosts = @discovery['hosts'].shuffle

log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}"
@should_exit = false

@etcd_hosts.each do |h|
host, port = h.split(':')
port = port || 4003
@etcd = ::Etcd.client(:host => host, :port => port)

connected =
begin
@etcd.leader
rescue
false
end

break if connected
end

# call the callback to bootstrap the process
discover
@synapse.reconfigure!
@watcher = Thread.new do
watch
end
end

def stop
log.warn "synapse: etcd watcher exiting"

@should_exit = true
@etcd = nil

log.info "synapse: etcd watcher cleaned up successfully"
end

def ping?
@etcd.leader
end

private
def validate_discovery_opts
raise ArgumentError, "invalid discovery method #{@discovery['method']}" \
unless @discovery['method'] == 'etcd'
raise ArgumentError, "missing or invalid etcd host for service #{@name}" \
unless @discovery['host']
raise ArgumentError, "missing or invalid etcd port for service #{@name}" \
unless @discovery['port']
raise ArgumentError, "invalid etcd path for service #{@name}" \
unless @discovery['path']
end

# helper method that ensures that the discovery path exists
def create(path)
log.debug "synapse: creating etcd path: #{path}"
@etcd.create(path, dir: true)
end

def each_node(node)
begin
host, port, name = deserialize_service_instance(node.value)
rescue StandardError => e
log.error "synapse: invalid data in etcd node #{node.inspect} at #{@discovery['path']}: #{e} DATA #{node.value}"
nil
else
server_port = @server_port_override ? @server_port_override : port

# find the numberic id in the node name; used for leader elections if enabled
numeric_id = node.key.split('/').last
numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil

log.warn "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}"
{ 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id}
end
end

def each_dir(d)
new_backends = []
d.children.each do |node|
if node.directory?
new_backends << each_dir(@etcd.get(node.key))
else
backend = each_node(node)
if backend
new_backends << backend
end
end
end
new_backends.flatten
end

# find the current backends at the discovery path; sets @backends
def discover
log.info "synapse: discovering backends for service #{@name}"

d = nil
begin
d = @etcd.get(@discovery['path'])
rescue Etcd::KeyNotFound
create(@discovery['path'])
d = @etcd.get(@discovery['path'])
end

new_backends = []
if d.directory?
new_backends = each_dir(d)
else
log.warn "synapse: path #{@discovery['path']} is not a directory"
end

if new_backends.empty?
if @default_servers.empty?
log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}"
false
else
log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}"
@backends = @default_servers
true
end
else
if @backends != new_backends
log.info "synapse: discovered #{new_backends.length} backends (including new) for service #{@name}"
@backends = new_backends
true
else
log.info "synapse: discovered #{new_backends.length} backends for service #{@name}"
false
end
end
end

def watch
while !@should_exit
begin
@etcd.watch(@discovery['path'], :timeout => 60, :recursive => true)
rescue Timeout::Error
else
if discover
@synapse.reconfigure!
end
end
end
end

# decode the data at a zookeeper endpoint
def deserialize_service_instance(data)
log.debug "synapse: deserializing process data"
decoded = JSON.parse(data)

host = decoded['host'] || (raise ValueError, 'instance json data does not have host key')
port = decoded['port'] || (raise ValueError, 'instance json data does not have port key')
name = decoded['name'] || nil

return host, port, name
end
end
end

1 change: 1 addition & 0 deletions synapse.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "aws-sdk", "~> 1.39"
gem.add_runtime_dependency "docker-api", "~> 1.7.2"
gem.add_runtime_dependency "zk", "~> 1.9.4"
gem.add_runtime_dependency "etcd", "~> 0.2.3"

gem.add_development_dependency "rake"
gem.add_development_dependency "rspec"
Expand Down