diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..f936f767 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM phusion/baseimage:0.9.10 +ENV HOME /root +CMD ["/sbin/my_init"] +RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + diff --git a/Gemfile.lock b/Gemfile.lock index 05b34343..c66fef14 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,7 @@ PATH synapse (0.11.1) aws-sdk (~> 1.39) docker-api (~> 1.7.2) + etcd (~> 0.2.3) zk (~> 1.9.4) GEM @@ -21,6 +22,8 @@ GEM json excon (0.38.0) ffi (1.9.3-java) + etcd (0.2.3) + mixlib-log json (1.8.1) json (1.8.1-java) little-plugger (1.1.3) @@ -33,27 +36,27 @@ GEM nokogiri (1.6.2.1) mini_portile (= 0.6.0) nokogiri (1.6.2.1-java) - pry (0.9.12.2) + pry (0.9.12.6) coderay (~> 1.0.5) + mixlib-log (1.6.0) method_source (~> 0.8) slop (~> 3.4) - pry (0.9.12.2-java) - coderay (~> 1.0.5) - method_source (~> 0.8) - slop (~> 3.4) - spoon (~> 0.0) pry-nav (0.2.3) pry (~> 0.9.10) - rake (10.1.1) - rspec (2.14.1) - rspec-core (~> 2.14.0) - rspec-expectations (~> 2.14.0) - rspec-mocks (~> 2.14.0) - rspec-core (2.14.5) - rspec-expectations (2.14.2) - diff-lcs (>= 1.1.3, < 2.0) - rspec-mocks (2.14.3) - slop (3.4.6) + rake (10.3.2) + rspec (3.0.0) + rspec-core (~> 3.0.0) + rspec-expectations (~> 3.0.0) + rspec-mocks (~> 3.0.0) + rspec-core (3.0.2) + rspec-support (~> 3.0.0) + rspec-expectations (3.0.2) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.0.0) + rspec-mocks (3.0.2) + rspec-support (~> 3.0.0) + rspec-support (3.0.2) + slop (3.5.0) slyphon-log4j (1.2.15) slyphon-zookeeper_jar (3.3.5-java) spoon (0.0.4) @@ -67,7 +70,6 @@ GEM slyphon-zookeeper_jar (= 3.3.5) PLATFORMS - java ruby DEPENDENCIES diff --git a/lib/synapse/service_watcher.rb b/lib/synapse/service_watcher.rb index ee05e6c2..a5fb143a 100644 --- a/lib/synapse/service_watcher.rb +++ b/lib/synapse/service_watcher.rb @@ -4,6 +4,7 @@ require "synapse/service_watcher/dns" require "synapse/service_watcher/docker" require "synapse/service_watcher/zookeeper_dns" +require "synapse/service_watcher/etcd" module Synapse class ServiceWatcher @@ -15,6 +16,7 @@ class ServiceWatcher 'dns' => DnsWatcher, 'docker' => DockerWatcher, 'zookeeper_dns' => ZookeeperDnsWatcher, + 'etcd' => EtcdWatcher } # the method which actually dispatches watcher creation requests @@ -32,3 +34,4 @@ def self.create(name, opts, synapse) end end end + diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb new file mode 100644 index 00000000..e392f701 --- /dev/null +++ b/lib/synapse/service_watcher/etcd.rb @@ -0,0 +1,187 @@ +require "synapse/service_watcher/base" + +require 'etcd' + +# Monkeypatch till 91f9e72d6d57ae3760e9266835f404d986072590 gets to rubygems.. +module Etcd + module Keys + def watch(key, opts = {}) + params = { wait: true } + fail ArgumentError, 'Second argument must be a hash' unless opts.is_a?(Hash) + timeout = opts[:timeout] || @read_timeout + index = opts[:waitIndex] || opts[:index] + params[:waitIndex] = index unless index.nil? + params[:consistent] = opts[:consistent] if opts.key?(:consistent) + params[:recursive] = opts[:recursive] if opts.key?(:recursive) + + response = api_execute(key_endpoint + key, :get, + timeout: timeout, params: params) + Response.from_http_response(response) + end + end +end + +module Synapse + class EtcdWatcher < BaseWatcher + NUMBERS_RE = /^\d+$/ + + def start + @etcd_hosts = @discovery['hosts'].shuffle + + log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}" + @should_exit = false + + @etcd_hosts.each do |h| + host, port = h.split(':') + port = port || 4003 + @etcd = ::Etcd.client(:host => host, :port => port) + + connected = + begin + @etcd.leader + rescue + false + end + + break if connected + end + + # call the callback to bootstrap the process + discover + @synapse.reconfigure! + @watcher = Thread.new do + watch + end + end + + def stop + log.warn "synapse: etcd watcher exiting" + + @should_exit = true + @etcd = nil + + log.info "synapse: etcd watcher cleaned up successfully" + end + + def ping? + @etcd.leader + end + + private + def validate_discovery_opts + raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ + unless @discovery['method'] == 'etcd' + raise ArgumentError, "missing or invalid etcd host for service #{@name}" \ + unless @discovery['host'] + raise ArgumentError, "missing or invalid etcd port for service #{@name}" \ + unless @discovery['port'] + raise ArgumentError, "invalid etcd path for service #{@name}" \ + unless @discovery['path'] + end + + # helper method that ensures that the discovery path exists + def create(path) + log.debug "synapse: creating etcd path: #{path}" + @etcd.create(path, dir: true) + end + + def each_node(node) + begin + host, port, name = deserialize_service_instance(node.value) + rescue StandardError => e + log.error "synapse: invalid data in etcd node #{node.inspect} at #{@discovery['path']}: #{e} DATA #{node.value}" + nil + else + server_port = @server_port_override ? @server_port_override : port + + # find the numberic id in the node name; used for leader elections if enabled + numeric_id = node.key.split('/').last + numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil + + log.warn "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" + { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} + end + end + + def each_dir(d) + new_backends = [] + d.children.each do |node| + if node.directory? + new_backends << each_dir(@etcd.get(node.key)) + else + backend = each_node(node) + if backend + new_backends << backend + end + end + end + new_backends.flatten + end + + # find the current backends at the discovery path; sets @backends + def discover + log.info "synapse: discovering backends for service #{@name}" + + d = nil + begin + d = @etcd.get(@discovery['path']) + rescue Etcd::KeyNotFound + create(@discovery['path']) + d = @etcd.get(@discovery['path']) + end + + new_backends = [] + if d.directory? + new_backends = each_dir(d) + else + log.warn "synapse: path #{@discovery['path']} is not a directory" + end + + if new_backends.empty? + if @default_servers.empty? + log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" + false + else + log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" + @backends = @default_servers + true + end + else + if @backends != new_backends + log.info "synapse: discovered #{new_backends.length} backends (including new) for service #{@name}" + @backends = new_backends + true + else + log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" + false + end + end + end + + def watch + while !@should_exit + begin + @etcd.watch(@discovery['path'], :timeout => 60, :recursive => true) + rescue Timeout::Error + else + if discover + @synapse.reconfigure! + end + end + end + end + + # decode the data at a zookeeper endpoint + def deserialize_service_instance(data) + log.debug "synapse: deserializing process data" + decoded = JSON.parse(data) + + host = decoded['host'] || (raise ValueError, 'instance json data does not have host key') + port = decoded['port'] || (raise ValueError, 'instance json data does not have port key') + name = decoded['name'] || nil + + return host, port, name + end + end +end + diff --git a/synapse.gemspec b/synapse.gemspec index c07f9bf5..06b41f22 100644 --- a/synapse.gemspec +++ b/synapse.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "aws-sdk", "~> 1.39" gem.add_runtime_dependency "docker-api", "~> 1.7.2" gem.add_runtime_dependency "zk", "~> 1.9.4" + gem.add_runtime_dependency "etcd", "~> 0.2.3" gem.add_development_dependency "rake" gem.add_development_dependency "rspec"