From 2e5fa36ad6cadc279a3359fedaed9ee77cee8de5 Mon Sep 17 00:00:00 2001 From: Josh Snyder Date: Thu, 16 Jun 2016 19:47:20 +0000 Subject: [PATCH 1/4] Make it possible to use the Zookeeper watcher in standalone mode --- lib/synapse/service_watcher/zookeeper.rb | 68 +++++++++++++----------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 19f82774..9dea48f9 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -29,14 +29,14 @@ def initialize(opts={}, synapse) end end - def start + def start(initial_discover=true) @zk_hosts = @discovery['hosts'].sort.join(',') @watcher = nil @zk = nil log.info "synapse: starting ZK watcher #{@name} @ hosts: #{@zk_hosts}, path: #{@discovery['path']}" - zk_connect + zk_connect(initial_discover) end def stop @@ -50,6 +50,34 @@ def ping? @zk && @zk.connected? end + def read_zk + new_backends = [] + @zk.children(@discovery['path'], :watch => true).each do |id| + node = @zk.get("#{@discovery['path']}/#{id}") + + begin + # TODO: Do less munging, or refactor out this processing + host, port, name, weight, haproxy_server_options, labels = deserialize_service_instance(node.first) + rescue StandardError => e + log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}" + else + server_port = @haproxy['server_port_override'] ? @haproxy['server_port_override'] : port + + # find the numberic id in the node name; used for leader elections if enabled + numeric_id = id.split('_').last + numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil + + log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" + new_backends << { + 'name' => name, 'host' => host, 'port' => server_port, + 'id' => numeric_id, 'weight' => weight, + 'haproxy_server_options' => haproxy_server_options, + 'labels' => labels + } + end + end + end + private def validate_discovery_opts @@ -124,33 +152,7 @@ def create(path) # find the current backends at the discovery path def discover log.info "synapse: discovering backends for service #{@name}" - - new_backends = [] - @zk.children(@discovery['path'], :watch => true).each do |id| - node = @zk.get("#{@discovery['path']}/#{id}") - - begin - # TODO: Do less munging, or refactor out this processing - host, port, name, weight, haproxy_server_options, labels = deserialize_service_instance(node.first) - rescue StandardError => e - log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}" - else - server_port = @haproxy['server_port_override'] ? @haproxy['server_port_override'] : port - - # find the numberic id in the node name; used for leader elections if enabled - numeric_id = id.split('_').last - numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil - - log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" - new_backends << { - 'name' => name, 'host' => host, 'port' => server_port, - 'id' => numeric_id, 'weight' => weight, - 'haproxy_server_options' => haproxy_server_options, - 'labels' => labels - } - end - end - + new_backends = read_zk set_backends(new_backends) end @@ -206,7 +208,7 @@ def zk_cleanup log.info "synapse: zookeeper watcher cleaned up successfully" end - def zk_connect + def zk_connect(initial_discover) log.info "synapse: zookeeper watcher connecting to ZK at #{@zk_hosts}" # Ensure that all Zookeeper watcher re-use a single zookeeper @@ -236,8 +238,10 @@ def zk_connect # the path must exist, otherwise watch callbacks will not work create(@discovery['path']) - # call the callback to bootstrap the process - watcher_callback.call + if initial_discover + # call the callback to bootstrap the process + watcher_callback.call + end end # decode the data at a zookeeper endpoint From 430951823f0d02daed6909ae254110470095460f Mon Sep 17 00:00:00 2001 From: Josh Snyder Date: Thu, 16 Jun 2016 20:30:36 +0000 Subject: [PATCH 2/4] Add the standalone mode binary --- bin/synapse_standalone | 11 +++++++++++ lib/synapse/service_watcher/zookeeper.rb | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) create mode 100755 bin/synapse_standalone diff --git a/bin/synapse_standalone b/bin/synapse_standalone new file mode 100755 index 00000000..cb5c96c6 --- /dev/null +++ b/bin/synapse_standalone @@ -0,0 +1,11 @@ +#!/usr/bin/env ruby +require "json" +require "synapse/service_watcher" + +service_data = JSON.parse(ARGV[0]) +watcher = Synapse::ServiceWatcher.create('', { + "discovery" => service_data, + 'haproxy' => {}, +}, {}) +watcher.start(initial_discover: false) +puts JSON.pretty_generate(watcher.read_zk) diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 9dea48f9..de29583f 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -29,7 +29,7 @@ def initialize(opts={}, synapse) end end - def start(initial_discover=true) + def start(initial_discover: true) @zk_hosts = @discovery['hosts'].sort.join(',') @watcher = nil From 8d3ba243faa13a4151462ab71a59c8f1eb46470c Mon Sep 17 00:00:00 2001 From: Josh Snyder Date: Thu, 16 Jun 2016 20:40:11 +0000 Subject: [PATCH 3/4] Fixes --- lib/synapse/service_watcher/zookeeper.rb | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index de29583f..e0814b4c 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -29,7 +29,7 @@ def initialize(opts={}, synapse) end end - def start(initial_discover: true) + def start(initial_discover=true) @zk_hosts = @discovery['hosts'].sort.join(',') @watcher = nil @@ -51,8 +51,7 @@ def ping? end def read_zk - new_backends = [] - @zk.children(@discovery['path'], :watch => true).each do |id| + @zk.children(@discovery['path'], :watch => true).collect do |id| node = @zk.get("#{@discovery['path']}/#{id}") begin @@ -60,6 +59,7 @@ def read_zk host, port, name, weight, haproxy_server_options, labels = deserialize_service_instance(node.first) rescue StandardError => e log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}" + nil else server_port = @haproxy['server_port_override'] ? @haproxy['server_port_override'] : port @@ -68,14 +68,14 @@ def read_zk numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" - new_backends << { + { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id, 'weight' => weight, 'haproxy_server_options' => haproxy_server_options, 'labels' => labels } end - end + end.compact end private @@ -260,4 +260,3 @@ def deserialize_service_instance(data) end end end - From 010722d2f074b6fa14ca2f124388e34a9c4223c2 Mon Sep 17 00:00:00 2001 From: Josh Snyder Date: Thu, 16 Jun 2016 20:46:08 +0000 Subject: [PATCH 4/4] Don't be ZK specific in the interface --- bin/synapse_standalone | 2 +- lib/synapse/service_watcher/zookeeper.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/synapse_standalone b/bin/synapse_standalone index cb5c96c6..4e3fc088 100755 --- a/bin/synapse_standalone +++ b/bin/synapse_standalone @@ -8,4 +8,4 @@ watcher = Synapse::ServiceWatcher.create('', { 'haproxy' => {}, }, {}) watcher.start(initial_discover: false) -puts JSON.pretty_generate(watcher.read_zk) +puts JSON.pretty_generate(watcher.read) diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index e0814b4c..fe6c42a9 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -50,7 +50,7 @@ def ping? @zk && @zk.connected? end - def read_zk + def read @zk.children(@discovery['path'], :watch => true).collect do |id| node = @zk.get("#{@discovery['path']}/#{id}") @@ -152,7 +152,7 @@ def create(path) # find the current backends at the discovery path def discover log.info "synapse: discovering backends for service #{@name}" - new_backends = read_zk + new_backends = read set_backends(new_backends) end