From 73c317b2600bfaa52c3c1ea2adadb928f693dc5b Mon Sep 17 00:00:00 2001 From: myhr Date: Sun, 4 Dec 2016 09:58:47 +0200 Subject: [PATCH 1/2] add watcher for YARN application that was deploy with Apache Slider. Checked in production of Playtech with 2.7.2 YARN and slide 0.91 on HDP 2.5. Main goal add service discovery for exports per container. --- README.md | 32 +++ config/yarn_slider.example.conf | 58 +++++ lib/synapse/service_watcher/yarn_slider.rb | 127 ++++++++++ .../service_watcher_yarn_slider_spec.rb | 228 ++++++++++++++++++ 4 files changed, 445 insertions(+) create mode 100644 config/yarn_slider.example.conf create mode 100644 lib/synapse/service_watcher/yarn_slider.rb create mode 100644 spec/lib/synapse/service_watcher_yarn_slider_spec.rb diff --git a/README.md b/README.md index 5a017b9e..1b80035c 100644 --- a/README.md +++ b/README.md @@ -238,6 +238,38 @@ It takes the following options: * `check_interval`: How often to request the list of tasks from Marathon (default: 10 seconds) * `port_index`: Index of the backend port in the task's "ports" array. (default: 0) +##### YARN Service Registry With Apache Slider ##### + +This watcher polls the [YARN API](https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API) and retrieves a list of yarn [tracking urls](lib/synapse/service_watcher/README.md) +for a given application by tag, only in RUNNING state. +Then it use [slier api](http://slider.incubator.apache.org/docs/slider_specs/specifying_exports.html#exporting-formatted-data-at-component-instance-level) for getting published config per container. +Slider exports must be configured per component and match pattern `host:port`. +Example of slider exports response +```json +{ +"description":"ComponentInstanceData", +"updated":0, +"entries":{ + "container_e02_1480417404363_0011_02_000002.server_port":"dn0.dev:50009", + "container_e02_1480417404363_0011_02_000003.server_port":"dn0.dev:50006" +}, +"empty":false +} +``` +where `server_port` is `parameter_sufix` +YARN container_id will be used as instance identifier. + +Tested with YARN 2.7.2 and slider 0.91 + +It takes the following options: + +* `yarn_api_url`: Address of the YARN Resource Manager API (e.g. `http://dn0.dev:8088`) +* `application_name`: [Name of the application](https://slider.incubator.apache.org/docs/slider_specs/application_definition.html) in slider, must present in yarn application description as tag. like`name: {application_name}` +* `yarn_apps_path`: optional YARN Resource Manager context path to apps +* `slider_componentinstance_path`: optional slider context path that will be concat with yarn application master tracking url +* `check_interval`: optional How often to request the list of tasks from Marathon (default: 10 seconds) +* `parameter_sufix`: optional String that will be looking in json published by slider. (default: 0) + #### Listing Default Servers #### You may list a number of default servers providing a service. diff --git a/config/yarn_slider.example.conf b/config/yarn_slider.example.conf new file mode 100644 index 00000000..89d3df67 --- /dev/null +++ b/config/yarn_slider.example.conf @@ -0,0 +1,58 @@ +{ + "services": { + "kafka_feeder": { + "default_servers": [], + "discovery": { + "method": "yarn_slider", + "yarn_api_url": "http://dn0.dev:8088/", + "application_name": "fee" + }, + "haproxy": { + "port": 3213, + "server_options": "check inter 2s rise 3 fall 2", + "listen": [ + "mode http", + "option httpchk /health", + "http-check expect string OK" + ] + } + + } + }, + "haproxy": { + "reload_command": "sudo service haproxy reload", + "config_file_path": "/etc/haproxy/haproxy.cfg", + "socket_file_path": "/var/haproxy/stats.sock", + "do_writes": false, + "do_reloads": false, + "do_socket": false, + "global": [ + "daemon", + "user haproxy", + "group haproxy", + "maxconn 4096", + "log 127.0.0.1 local0", + "log 127.0.0.1 local1 notice", + "stats socket /var/haproxy/stats.sock mode 666 level admin" + ], + "defaults": [ + "log global", + "option dontlognull", + "maxconn 2000", + "retries 3", + "timeout connect 5s", + "timeout client 1m", + "timeout server 1m", + "option redispatch", + "balance roundrobin" + ], + "extra_sections": { + "listen stats :3212": [ + "mode http", + "stats enable", + "stats uri /", + "stats refresh 5s" + ] + } + } +} diff --git a/lib/synapse/service_watcher/yarn_slider.rb b/lib/synapse/service_watcher/yarn_slider.rb new file mode 100644 index 00000000..d39f22f2 --- /dev/null +++ b/lib/synapse/service_watcher/yarn_slider.rb @@ -0,0 +1,127 @@ +require 'synapse/service_watcher/base' +require 'json' +require 'net/http' +require 'resolv' + +class Synapse::ServiceWatcher + class YarnSliderWatcher < BaseWatcher + def start + @check_interval = @discovery['check_interval'] || 10.0 + @connection = nil + @watcher = Thread.new { sleep splay; watch } + end + + def stop + @connection.finish + rescue + # pass + end + + private + + def validate_discovery_opts + required_opts = %w[yarn_api_url application_name] + + required_opts.each do |opt| + if @discovery.fetch(opt, '').empty? + raise ArgumentError, + "a value for services.#{@name}.discovery.#{opt} must be specified" + end + end + end + + def attempt_connection(url) + uri = URI(url) + log.debug "synapse: try connect to #{uri}" + begin + connection = Net::HTTP.new(uri.host, uri.port) + connection.open_timeout = 5 + connection.start + return connection + rescue => ex + log.error "synapse: could not connect to YARN at #{url}: #{ex}" + raise ex + end + end + + def try_find_yarn_app_master_traking_url(name) + begin + yarn_rm_connection = attempt_connection(@discovery['yarn_api_url']) + yarn_apps_path = @discovery.fetch('yarn_apps_path', '/ws/v1/cluster/apps?limit=2&state=RUNNING&applicationTypes=org-apache-slider&applicationTags=name:%20') + yarn_path_resolved = yarn_apps_path + name + log.debug "synapse resolved yarn path #{yarn_path_resolved}" + req = Net::HTTP::Get.new(yarn_path_resolved) + req['Content-Type'] = 'application/json' + req['Accept'] = 'application/json' + response = yarn_rm_connection.request(req) + log.debug "synapse yarn apps response\n#{response.body}" + apps = JSON.parse(response.body).fetch('apps', []) + if apps.nil? + raise 'No yarn application with name ' + name + end + if apps['app'].size > 1 + raise 'More then 1 yarn application with name ' + name + end + return apps['app'].at(0)['trackingUrl'] + rescue => ex + log.warn "synapse: error while watcher try find yarn application: #{ex.inspect}" + log.warn ex.backtrace.join("\n") + raise ex + end + end + + def watch + until @should_exit + retry_count = 0 + start = Time.now + + begin + if @connection.nil? + app_am_url = try_find_yarn_app_master_traking_url(@discovery['application_name']) + log.debug "synapse: try connect to app traking url #{app_am_url}" + @slider_component_instance_url = URI(app_am_url + @discovery.fetch('slider_componentinstance_path', '/ws/v1/slider/publisher/slider/componentinstancedata')) + @connection = attempt_connection(@slider_component_instance_url) + end + + req = Net::HTTP::Get.new(@slider_component_instance_url.request_uri) + req['Content-Type'] = 'application/json' + req['Accept'] = 'application/json' + response = @connection.request(req) + + lookup_sufix = @discovery.fetch('parameter_sufix', '.server_port') + entries = JSON.parse(response.body).fetch('entries', []) + backends = entries.keep_if{ |entry| entry.include? lookup_sufix }.map do |key, value| + { 'name' => key[/(.*)#{lookup_sufix}/,1], + 'host' => value[/(.*):.*/,1], + 'port' => value[/.*:(.*)/,1], + } + end.sort_by { |entry| entry['name'] } + + set_backends(backends) + rescue EOFError + # If the persistent HTTP connection is severed, we can automatically + # retry + log.info "synapse: yarn_slider HTTP API at {@slider_component_instance_url} disappeared, reconnecting..." + retry if (retry_count += 1) == 1 + rescue => e + log.warn "synapse: error in watcher thread: #{e.inspect}" + log.warn e.backtrace.join("\n") + @connection = nil + ensure + elapsed_time = Time.now - start + sleep (@check_interval - elapsed_time) if elapsed_time < @check_interval + end + + @should_exit = true if only_run_once? # for testability + end + end + + def splay + Random.rand(@check_interval) + end + + def only_run_once? + false + end + end +end \ No newline at end of file diff --git a/spec/lib/synapse/service_watcher_yarn_slider_spec.rb b/spec/lib/synapse/service_watcher_yarn_slider_spec.rb new file mode 100644 index 00000000..71143258 --- /dev/null +++ b/spec/lib/synapse/service_watcher_yarn_slider_spec.rb @@ -0,0 +1,228 @@ +require 'spec_helper' +require 'synapse/service_watcher/yarn_slider' + +describe Synapse::ServiceWatcher::YarnSliderWatcher do + let(:mocksynapse) { double() } + let(:yarn_host) { '127.0.0.1' } + let(:yarn_port) { '8088' } + let(:app_name) { 'feeder' } + let(:check_interval) { 11 } + let(:yarn_request_uri) { "#{yarn_host}:#{yarn_port}/ws/v1/cluster/apps?limit=2&state=RUNNING&applicationTypes=org-apache-slider&applicationTags=name:%20feeder" } + let(:slider_request_uri) { "#{yarn_host}:#{yarn_port}/proxy/application_1480417404363_0011/ws/v1/slider/publisher/slider/componentinstancedata" } + let(:config) do + { + 'name' => 'foo', + 'discovery' => { + 'method' => 'yarn', + 'yarn_api_url' => "http://#{yarn_host}:#{yarn_port}", + 'application_name' => app_name, + 'check_interval' => check_interval, + }, + 'haproxy' => {}, + } + end + let(:slider_response) { {'description' => 'ComponentInstanceData', 'updated' => 0, 'entries' => nil, 'empty' => false} } + let(:yarn_response) { { 'apps' => nil } } + + subject { described_class.new(config, mocksynapse) } + + before do + allow(subject.log).to receive(:warn) + allow(subject.log).to receive(:info) + allow(subject.log).to receive(:debug) + + allow(Thread).to receive(:new).and_yield + allow(subject).to receive(:sleep) + allow(subject).to receive(:only_run_once?).and_return(true) + allow(subject).to receive(:splay).and_return(0) + + stub_request(:get, yarn_request_uri). + with(:headers => { + 'Accept'=>'application/json', + 'Accept-Encoding'=>'gzip;q=1.0,deflate;q=0.6,identity;q=0.3', + 'Content-Type'=>'application/json', + 'User-Agent'=>'Ruby' + }).to_return(:body => JSON.generate(yarn_response)) + + stub_request(:get, slider_request_uri). + with(:headers => { + 'Accept'=>'application/json', + 'Accept-Encoding'=>'gzip;q=1.0,deflate;q=0.6,identity;q=0.3', + 'Content-Type'=>'application/json', + 'User-Agent'=>'Ruby' + }).to_return(:body => JSON.generate(slider_response)) + end + + context 'with a valid argument hash' do + it 'instantiates' do + expect(subject).to be_a(Synapse::ServiceWatcher::YarnSliderWatcher) + end + end + + describe '#watch' do + context 'when synapse cannot connect to yarn' do + before do + allow(Net::HTTP).to receive(:new). + with(yarn_host, yarn_port.to_i). + and_raise(Errno::ECONNREFUSED) + end + + it 'does not crash' do + expect { subject.start }.not_to raise_error + end + end + + it 'requests the proper API endpoint one time' do + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + end + + context 'when yarn return empty apps' do + it 'does not crash' do + expect { subject.start }.not_to raise_error + end + it 'requests the proper API endpoint one time' do + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + end + end + + context 'when slider return empty result' do + let(:yarn_response) do + {"apps"=> + {"app" => + [ + { "name" => "kafka_feeder", + "trackingUrl" => "http://#{yarn_host}:#{yarn_port}/proxy/application_1480417404363_0011", + "applicationTags" => "name: feeder,description: bit kafka feeder,version: 0.0.1" + } + ] + } + } + end + it 'does not crash' do + expect { subject.start }.not_to raise_error + end + it 'requests the proper API endpoint one time' do + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + expect(a_request(:get, slider_request_uri)).to have_been_made.times(1) + end + end + + context 'when the API path (yarn_api_path) is customized' do + let(:config) do + super().tap do |c| + c['discovery']['yarn_apps_path'] = '/v3/tasks/' + end + end + + let(:yarn_request_uri) { "#{yarn_host}:#{yarn_port}/v3/tasks/#{app_name}" } + + it 'calls the customized path' do + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + end + end + + context 'with entries returned from slider' do + let(:yarn_response) do + {"apps"=> + {"app" => + [ + { "name" => "kafka_feeder", + "trackingUrl" => "http://#{yarn_host}:#{yarn_port}/proxy/application_1480417404363_0011", + "applicationTags" => "name: feeder,description: bit kafka feeder,version: 0.0.1" + } + ] + } + } + end + let(:slider_response) do + { + "description" => "ComponentInstanceData", + "updated" => 0, + "entries" => { + "container_e02_1480417404363_0011_02_000002.server_port" => "dn0.dev:50009", + "container_e02_1480417404363_0011_02_000003.server_port" => "dn0.dev:50006" + }, + "empty" => false + } + end + let(:expected_backend_hash1) do + { + 'name' => 'container_e02_1480417404363_0011_02_000002', 'host' => 'dn0.dev', 'port' => "50009" + } + end + let(:expected_backend_hash2) do + { + 'name' => 'container_e02_1480417404363_0011_02_000003', 'host' => 'dn0.dev', 'port' => "50006" + } + end + + it 'adds the task as a backend' do + expect(subject).to receive(:set_backends).with([expected_backend_hash1,expected_backend_hash2]) + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + expect(a_request(:get, slider_request_uri)).to have_been_made.times(1) + end + + context 'with a entries with out right sufix' do + let(:yarn_response) do + {"apps"=> + {"app" => + [ + { "name" => "kafka_feeder", + "trackingUrl" => "http://#{yarn_host}:#{yarn_port}/proxy/application_1480417404363_0011", + "applicationTags" => "name: feeder,description: bit kafka feeder,version: 0.0.1" + } + ] + } + } + end + let(:slider_response) do + { + "description" => "ComponentInstanceData", + "updated" => 0, + "entries" => { + "container_e02_1480417404363_0011_02_000002.host_port" => "dn0.dev:50009", + "container_e02_1480417404363_0011_02_000003.host_port" => "dn0.dev:50006" + }, + "empty" => false + } + end + it 'filters tasks that have no startedAt value' do + expect(subject).to receive(:set_backends).with([]) + subject.start + end + end + + context 'when yarn returns invalid response' do + let(:yarn_response) { [] } + it 'does not blow up' do + expect { subject.start }.to_not raise_error + end + end + + context 'when the job takes a long time for some reason' do + let(:job_duration) { 10 } # seconds + + before do + actual_time = Time.now + time_offset = -1 * job_duration + allow(Time).to receive(:now) do + # on first run, return the right time + # subsequently, add in our job_duration offset + actual_time + (time_offset += job_duration) + end + allow(subject).to receive(:set_backends) + end + + it 'only sleeps for the difference' do + expect(subject).to receive(:sleep).with(check_interval - job_duration) + subject.start + end + end + end + end +end \ No newline at end of file From 8e2af76b8fefa733294df34a61dd269fb8ab7ee8 Mon Sep 17 00:00:00 2001 From: myhr Date: Mon, 5 Dec 2016 11:03:26 +0200 Subject: [PATCH 2/2] fix: remove unnecessary encoding that add ruby 2.3. lib by default --- spec/lib/synapse/service_watcher_yarn_slider_spec.rb | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/spec/lib/synapse/service_watcher_yarn_slider_spec.rb b/spec/lib/synapse/service_watcher_yarn_slider_spec.rb index 71143258..2ca5fd77 100644 --- a/spec/lib/synapse/service_watcher_yarn_slider_spec.rb +++ b/spec/lib/synapse/service_watcher_yarn_slider_spec.rb @@ -39,16 +39,14 @@ stub_request(:get, yarn_request_uri). with(:headers => { 'Accept'=>'application/json', - 'Accept-Encoding'=>'gzip;q=1.0,deflate;q=0.6,identity;q=0.3', - 'Content-Type'=>'application/json', + 'Content-Type'=>'application/json', 'User-Agent'=>'Ruby' }).to_return(:body => JSON.generate(yarn_response)) stub_request(:get, slider_request_uri). with(:headers => { 'Accept'=>'application/json', - 'Accept-Encoding'=>'gzip;q=1.0,deflate;q=0.6,identity;q=0.3', - 'Content-Type'=>'application/json', + 'Content-Type'=>'application/json', 'User-Agent'=>'Ruby' }).to_return(:body => JSON.generate(slider_response)) end