diff --git a/bin/synapse_standalone b/bin/synapse_standalone new file mode 100755 index 00000000..4e3fc088 --- /dev/null +++ b/bin/synapse_standalone @@ -0,0 +1,11 @@ +#!/usr/bin/env ruby +require "json" +require "synapse/service_watcher" + +service_data = JSON.parse(ARGV[0]) +watcher = Synapse::ServiceWatcher.create('', { + "discovery" => service_data, + 'haproxy' => {}, +}, {}) +watcher.start(initial_discover: false) +puts JSON.pretty_generate(watcher.read) diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 19f82774..fe6c42a9 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -29,14 +29,14 @@ def initialize(opts={}, synapse) end end - def start + def start(initial_discover=true) @zk_hosts = @discovery['hosts'].sort.join(',') @watcher = nil @zk = nil log.info "synapse: starting ZK watcher #{@name} @ hosts: #{@zk_hosts}, path: #{@discovery['path']}" - zk_connect + zk_connect(initial_discover) end def stop @@ -50,6 +50,34 @@ def ping? @zk && @zk.connected? end + def read + @zk.children(@discovery['path'], :watch => true).collect do |id| + node = @zk.get("#{@discovery['path']}/#{id}") + + begin + # TODO: Do less munging, or refactor out this processing + host, port, name, weight, haproxy_server_options, labels = deserialize_service_instance(node.first) + rescue StandardError => e + log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}" + nil + else + server_port = @haproxy['server_port_override'] ? @haproxy['server_port_override'] : port + + # find the numberic id in the node name; used for leader elections if enabled + numeric_id = id.split('_').last + numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil + + log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" + { + 'name' => name, 'host' => host, 'port' => server_port, + 'id' => numeric_id, 'weight' => weight, + 'haproxy_server_options' => haproxy_server_options, + 'labels' => labels + } + end + end.compact + end + private def validate_discovery_opts @@ -124,33 +152,7 @@ def create(path) # find the current backends at the discovery path def discover log.info "synapse: discovering backends for service #{@name}" - - new_backends = [] - @zk.children(@discovery['path'], :watch => true).each do |id| - node = @zk.get("#{@discovery['path']}/#{id}") - - begin - # TODO: Do less munging, or refactor out this processing - host, port, name, weight, haproxy_server_options, labels = deserialize_service_instance(node.first) - rescue StandardError => e - log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}" - else - server_port = @haproxy['server_port_override'] ? @haproxy['server_port_override'] : port - - # find the numberic id in the node name; used for leader elections if enabled - numeric_id = id.split('_').last - numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil - - log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" - new_backends << { - 'name' => name, 'host' => host, 'port' => server_port, - 'id' => numeric_id, 'weight' => weight, - 'haproxy_server_options' => haproxy_server_options, - 'labels' => labels - } - end - end - + new_backends = read set_backends(new_backends) end @@ -206,7 +208,7 @@ def zk_cleanup log.info "synapse: zookeeper watcher cleaned up successfully" end - def zk_connect + def zk_connect(initial_discover) log.info "synapse: zookeeper watcher connecting to ZK at #{@zk_hosts}" # Ensure that all Zookeeper watcher re-use a single zookeeper @@ -236,8 +238,10 @@ def zk_connect # the path must exist, otherwise watch callbacks will not work create(@discovery['path']) - # call the callback to bootstrap the process - watcher_callback.call + if initial_discover + # call the callback to bootstrap the process + watcher_callback.call + end end # decode the data at a zookeeper endpoint @@ -256,4 +260,3 @@ def deserialize_service_instance(data) end end end -