diff --git a/README.md b/README.md index b6fddb6..14ae4c5 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ It is capable of using the following Thrift transports: As of version 1.0, it supports asynchronous execution of queries. This allows you to submit a query, disconnect, then reconnect later to check the status and retrieve the results. -This frees systems of the need to keep a persistent TCP connection. +This frees systems of the need to keep a persistent TCP connection. ## About Thrift services and transports @@ -29,7 +29,7 @@ BufferedTransport. ### Hiveserver2 -[Hiveserver2](https://cwiki.apache.org/confluence/display/Hive/Setting+up+HiveServer2) +[Hiveserver2](https://cwiki.apache.org/confluence/display/Hive/Setting+up+HiveServer2) (the new Thrift interface) can support many concurrent client connections. It is shipped with Hive 0.10 and later. In Hive 0.10, only BufferedTranport and SaslClientTransport are supported; starting with Hive 0.12, HTTPClientTransport is also supported. @@ -63,7 +63,7 @@ Otherwise you'll get this nasty-looking exception in the logs: at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) - at java.lang.Thread.run(Thread.java:662) + at java.lang.Thread.run(Thread.java:662) ### Other Hive-compatible services @@ -77,7 +77,7 @@ Since Hiveserver has no options, connection code is very simple: RBHive.connect('hive.server.address', 10_000) do |connection| connection.fetch 'SELECT city, country FROM cities' - end + end ➔ [{:city => "London", :country => "UK"}, {:city => "Mumbai", :country => "India"}, {:city => "New York", :country => "USA"}] ### Hiveserver2 @@ -85,12 +85,12 @@ Since Hiveserver has no options, connection code is very simple: Hiveserver2 has several options with how it is run. The connection code takes a hash with these possible parameters: * `:transport` - one of `:buffered` (BufferedTransport), `:http` (HTTPClientTransport), or `:sasl` (SaslClientTransport) -* `:hive_version` - the number after the period in the Hive version; e.g. `10`, `11`, `12`, `13` or one of +* `:hive_version` - the number after the period in the Hive version; e.g. `10`, `11`, `12`, `13` or one of a set of symbols; see [Hiveserver2 protocol versions](#hiveserver2-protocol-versions) below for details * `:timeout` - if using BufferedTransport or SaslClientTransport, this is how long the timeout on the socket will be * `:sasl_params` - if using SaslClientTransport, this is a hash of parameters to set up the SASL connection -If you pass either an empty hash or nil in place of the options (or do not supply them), the connection +If you pass either an empty hash or nil in place of the options (or do not supply them), the connection is attempted with the Hive version set to 0.10, using `:buffered` as the transport, and a timeout of 1800 seconds. Connecting with the defaults: @@ -117,7 +117,17 @@ Connecting with a specific Hive version (0.12) and using the `:http` transport: connection.fetch('SHOW TABLES') end -We have not tested the SASL connection, as we don't run SASL; pull requests and testing are welcomed. +Connecting with SASL and Kerberos v5: + + RBHive.tcli_connect('hive.hadoop.forward.co.uk', 10_000, { + :transport => :sasl, + :sasl_params => { + :mechanism => 'GSSAPI', + :remote_host => 'example.com', + :remote_principal => 'hive/example.com@EXAMPLE.COM' + ) do |connection| + connection.fetch("show tables") + end #### Hiveserver2 protocol versions @@ -204,7 +214,7 @@ one of the following values and meanings: | :unknown | The query is in an unknown state | :pending | The query is ready to run but is not running -There are also the utility methods `async_is_complete?(handles)`, `async_is_running?(handles)`, +There are also the utility methods `async_is_complete?(handles)`, `async_is_running?(handles)`, `async_is_failed?(handles)` and `async_is_cancelled?(handles)`. #### `async_cancel(handles)` @@ -225,14 +235,14 @@ same way as the normal synchronous methods. RBHive.connect('hive.server.address', 10_000) do |connection| connection.fetch 'SELECT city, country FROM cities' - end + end ➔ [{:city => "London", :country => "UK"}, {:city => "Mumbai", :country => "India"}, {:city => "New York", :country => "USA"}] #### Hiveserver2 RBHive.tcli_connect('hive.server.address', 10_000) do |connection| connection.fetch 'SELECT city, country FROM cities' - end + end ➔ [{:city => "London", :country => "UK"}, {:city => "Mumbai", :country => "India"}, {:city => "New York", :country => "USA"}] ### Executing a query @@ -266,13 +276,13 @@ Then for Hiveserver: RBHive.connect('hive.server.address', 10_000) do |connection| connection.create_table(table) - end + end Or Hiveserver2: RBHive.tcli_connect('hive.server.address', 10_000) do |connection| connection.create_table(table) - end + end ### Modifying table schema @@ -290,18 +300,18 @@ Then for Hiveserver: RBHive.connect('hive.server.address') do |connection| connection.replace_columns(table) - end + end Or Hiveserver2: RBHive.tcli_connect('hive.server.address') do |connection| connection.replace_columns(table) - end + end ### Setting properties You can set various properties for Hive tasks, some of which change how they run. Consult the Apache -Hive documentation and Hadoop's documentation for the various properties that can be set. +Hive documentation and Hadoop's documentation for the various properties that can be set. For example, you can set the map-reduce job's priority with the following: connection.set("mapred.job.priority", "VERY_HIGH") @@ -310,7 +320,7 @@ For example, you can set the map-reduce job's priority with the following: #### Hiveserver - RBHive.connect('hive.hadoop.forward.co.uk', 10_000) {|connection| + RBHive.connect('hive.hadoop.forward.co.uk', 10_000) {|connection| result = connection.fetch("describe some_table") puts result.column_names.inspect puts result.first.inspect @@ -318,7 +328,7 @@ For example, you can set the map-reduce job's priority with the following: #### Hiveserver2 - RBHive.tcli_connect('hive.hadoop.forward.co.uk', 10_000) {|connection| + RBHive.tcli_connect('hive.hadoop.forward.co.uk', 10_000) {|connection| result = connection.fetch("describe some_table") puts result.column_names.inspect puts result.first.inspect diff --git a/lib/rbhive/t_c_l_i_connection.rb b/lib/rbhive/t_c_l_i_connection.rb index e1839e9..cc01fc1 100644 --- a/lib/rbhive/t_c_l_i_connection.rb +++ b/lib/rbhive/t_c_l_i_connection.rb @@ -30,7 +30,7 @@ def flush end module RBHive - + HIVE_THRIFT_MAPPING = { 10 => 0, 11 => 1, @@ -84,20 +84,20 @@ class TCLIConnection def initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) options ||= {} # backwards compatibility raise "'options' parameter must be a hash" unless options.is_a?(Hash) - + if options[:transport] == :sasl and options[:sasl_params].nil? raise ":transport is set to :sasl, but no :sasl_params option was supplied" end - + # Defaults to buffered transport, Hive 0.10, 1800 second timeout options[:transport] ||= :buffered options[:hive_version] ||= 10 options[:timeout] ||= 1800 @options = options - + # Look up the appropriate Thrift protocol version for the supplied Hive version @thrift_protocol_version = thrift_hive_protocol(options[:hive_version]) - + @logger = logger @transport = thrift_transport(server, port) @protocol = Thrift::BinaryProtocol.new(@transport) @@ -105,11 +105,11 @@ def initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) @session = nil @logger.info("Connecting to HiveServer2 #{server} on port #{port}") end - + def thrift_hive_protocol(version) HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version") end - + def thrift_transport(server, port) @logger.info("Initializing transport #{@options[:transport]}") case @options[:transport] @@ -188,7 +188,7 @@ def set(name,value) @logger.info("Setting #{name}=#{value}") self.execute("SET #{name}=#{value}") end - + # Async execute def async_execute(query) @logger.info("Executing query asynchronously: #{query}") @@ -204,35 +204,35 @@ def async_execute(query) # Return handles to get hold of this query / session again { - session: @session.sessionHandle, - guid: op_handle.operationId.guid, + session: @session.sessionHandle, + guid: op_handle.operationId.guid, secret: op_handle.operationId.secret } end - + # Is the query complete? def async_is_complete?(handles) async_state(handles) == :finished end - + # Is the query actually running? def async_is_running?(handles) async_state(handles) == :running end - + # Has the query failed? def async_is_failed?(handles) async_state(handles) == :error end - + def async_is_cancelled?(handles) async_state(handles) == :cancelled end - + def async_cancel(handles) @client.CancelOperation(prepare_cancel_request(handles)) end - + # Map states to symbols def async_state(handles) response = @client.GetOperationStatus( @@ -262,18 +262,18 @@ def async_state(handles) return :state_not_in_protocol end end - + # Async fetch results from an async execute def async_fetch(handles, max_rows = 100) # Can't get data from an unfinished query unless async_is_complete?(handles) raise "Can't perform fetch on a query in state: #{async_state(handles)}" end - + # Fetch and fetch_rows(prepare_operation_handle(handles), :first, max_rows) end - + # Performs a query on the server, fetches the results in batches of *batch_size* rows # and yields the result batches to a given block as arrays of rows. def async_fetch_in_batch(handles, batch_size = 1000, &block) @@ -290,12 +290,12 @@ def async_fetch_in_batch(handles, batch_size = 1000, &block) yield rows end end - + def async_close_session(handles) validate_handles!(handles) @client.CloseSession(Hive2::Thrift::TCloseSessionReq.new( sessionHandle: handles[:session] )) end - + # Pull rows from the query result def fetch_rows(op_handle, orientation = :first, max_rows = 1000) fetch_req = prepare_fetch_results(op_handle, orientation, max_rows) @@ -304,7 +304,7 @@ def fetch_rows(op_handle, orientation = :first, max_rows = 1000) rows = fetch_results.results.rows TCLIResultSet.new(rows, TCLISchemaDefinition.new(get_schema_for(op_handle), rows.first)) end - + # Performs a explain on the supplied query on the server, returns it as a ExplainResult. # (Only works on 0.12 if you have this patch - https://issues.apache.org/jira/browse/HIVE-5492) def explain(query) @@ -323,7 +323,7 @@ def fetch(query, max_rows = 100) # Get search operation handle to fetch the results op_handle = exec_result.operationHandle - + # Fetch the rows fetch_rows(op_handle, :first, max_rows) end @@ -332,7 +332,7 @@ def fetch(query, max_rows = 100) # and yields the result batches to a given block as arrays of rows. def fetch_in_batch(query, batch_size = 1000, &block) raise "No block given for the batch fetch request!" unless block_given? - + # Execute the query and check the result exec_result = execute(query) raise_error_if_failed!(exec_result) @@ -375,7 +375,9 @@ def method_missing(meth, *args) private def prepare_open_session(client_protocol) - req = ::Hive2::Thrift::TOpenSessionReq.new( @options[:sasl_params].nil? ? [] : @options[:sasl_params] ) + req = ::Hive2::Thrift::TOpenSessionReq.new( @options[:sasl_params].nil? ? [] : { + :username => @options[:sasl_params][:username], + :password => @options[:sasl_params][:password]}) req.client_protocol = client_protocol req end @@ -410,13 +412,13 @@ def prepare_operation_handle(handles) hasResultSet: false ) end - + def prepare_cancel_request(handles) Hive2::Thrift::TCancelOperationReq.new( operationHandle: prepare_operation_handle(handles) ) end - + def validate_handles!(handles) unless handles.has_key?(:guid) and handles.has_key?(:secret) and handles.has_key?(:session) raise "Invalid handles hash: #{handles.inspect}" diff --git a/lib/thrift/sasl_client_transport.rb b/lib/thrift/sasl_client_transport.rb index 7361a3a..226215b 100644 --- a/lib/thrift/sasl_client_transport.rb +++ b/lib/thrift/sasl_client_transport.rb @@ -1,10 +1,9 @@ -module Thrift - class SaslClientTransport < BufferedTransport - attr_reader :challenge +require 'gssapi' +module Thrift + class SaslClientTransport < FramedTransport STATUS_BYTES = 1 PAYLOAD_LENGTH_BYTES = 4 - AUTH_MECHANISM = 'PLAIN' NEGOTIATION_STATUS = { START: 0x01, OK: 0x02, @@ -15,76 +14,99 @@ class SaslClientTransport < BufferedTransport def initialize(transport, sasl_params={}) super(transport) - @challenge = nil + @sasl_complete = nil @sasl_username = sasl_params.fetch(:username, 'anonymous') @sasl_password = sasl_params.fetch(:password, 'anonymous') - end + @sasl_mechanism = sasl_params.fetch(:mechanism, 'PLAIN') - def read(sz) - len, = @transport.read(PAYLOAD_LENGTH_BYTES).unpack('l>') if @rbuf.nil? - sz = len if len && sz > len - @index += sz - ret = @rbuf.slice(@index - sz, sz) || Bytes.empty_byte_buffer - if ret.length == 0 - @rbuf = @transport.read(len) rescue Bytes.empty_byte_buffer - @index = sz - ret = @rbuf.slice(0, sz) || Bytes.empty_byte_buffer + unless ['PLAIN', 'GSSAPI'].include? @sasl_mechanism + raise "Unknown SASL mechanism: #{@sasl_mechanism}" end - ret - end - def read_byte - reset_buffer! if @index >= @rbuf.size - @index += 1 - Bytes.get_string_byte(@rbuf, @index - 1) + if @sasl_mechanism == 'GSSAPI' + @sasl_remote_principal = sasl_params[:remote_principal] + @sasl_remote_host = sasl_params[:remote_host] + @gsscli = GSSAPI::Simple.new(@sasl_remote_host, @sasl_remote_principal) + end end - def read_into_buffer(buffer, size) - i = 0 - while i < size - reset_buffer! if @index >= @rbuf.size - byte = Bytes.get_string_byte(@rbuf, @index) - Bytes.set_string_byte(buffer, i, byte) - @index += 1 - i += 1 + def open + super + + case @sasl_mechanism + when 'PLAIN' + handshake_plain! + when 'GSSAPI' + handshake_gssapi! end - i end - def write(buf) - initiate_hand_shake if @challenge.nil? - header = [buf.length].pack('l>') - @wbuf << (header + Bytes.force_binary_encoding(buf)) - end + private - protected + def handshake_plain! + token = "[#{@sasl_mechanism}]\u0000#{@sasl_username}\u0000#{@sasl_password}" + write_handshake_message(NEGOTIATION_STATUS[:START], @sasl_mechanism) + write_handshake_message(NEGOTIATION_STATUS[:OK], token) - def initiate_hand_shake - header = [NEGOTIATION_STATUS[:START], AUTH_MECHANISM.length].pack('cl>') - @transport.write header + AUTH_MECHANISM - message = "[#{AUTH_MECHANISM}]\u0000#{@sasl_username}\u0000#{@sasl_password}" - header = [NEGOTIATION_STATUS[:OK], message.length].pack('cl>') - @transport.write header + message - status, len = @transport.read(STATUS_BYTES + PAYLOAD_LENGTH_BYTES).unpack('cl>') + status, msg = read_handshake_message case status - when NEGOTIATION_STATUS[:BAD], NEGOTIATION_STATUS[:ERROR] - raise @transport.to_io.read(len) when NEGOTIATION_STATUS[:COMPLETE] - @challenge = @transport.to_io.read len + @sasl_complete = true when NEGOTIATION_STATUS[:OK] raise "Failed to complete challenge exchange: only NONE supported currently" end end - private + def handshake_gssapi! + token = @gsscli.init_context + write_handshake_message(NEGOTIATION_STATUS[:START], @sasl_mechanism) + write_handshake_message(NEGOTIATION_STATUS[:OK], token) - def reset_buffer! - len, = @transport.read(PAYLOAD_LENGTH_BYTES).unpack('l>') - @rbuf = @transport.read(len) - while @rbuf.size < len - @rbuf << @transport.read(len - @rbuf.size) + status, msg = read_handshake_message + case status + when NEGOTIATION_STATUS[:COMPLETE] + raise "Unexpected COMPLETE from server" + when NEGOTIATION_STATUS[:OK] + unless @gsscli.init_context(msg) + raise "GSSAPI: challenge provided by server could not be verified" + end + + write_handshake_message(NEGOTIATION_STATUS[:OK], "") + + status, msg = read_handshake_message + case status + when NEGOTIATION_STATUS[:COMPLETE] + raise "Unexpected COMPLETE from server" + when NEGOTIATION_STATUS[:OK] + unwrapped = @gsscli.unwrap_message(msg) + rewrapped = @gsscli.wrap_message(unwrapped) + + write_handshake_message(NEGOTIATION_STATUS[:COMPLETE], rewrapped) + + status, msg = read_handshake_message + case status + when NEGOTIATION_STATUS[:COMPLETE] + @sasl_complete = true + when NEGOTIATION_STATUS[:OK] + raise "Failed to complete GSS challenge exchange" + end + end + end + end + + def read_handshake_message + status, len = @transport.read(STATUS_BYTES + PAYLOAD_LENGTH_BYTES).unpack('cl>') + body = @transport.to_io.read(len) + if [NEGOTIATION_STATUS[:BAD], NEGOTIATION_STATUS[:ERROR]].include?(status) + raise "Exception from server: #{body}" end - @index = 0 + + [status, body] + end + + def write_handshake_message(status, message) + header = [status, message.length].pack('cl>') + @transport.write(header + message) end end @@ -93,5 +115,4 @@ def get_transport(transport) return SaslClientTransport.new(transport) end end - -end +end \ No newline at end of file diff --git a/rbhive.gemspec b/rbhive.gemspec index 5f7ddaa..5e75967 100644 --- a/rbhive.gemspec +++ b/rbhive.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |spec| spec.require_paths = ['lib'] spec.add_dependency('thrift', '= 0.10') + spec.add_dependency('gssapi', '~> 1.2') spec.add_dependency('json') spec.add_development_dependency 'rake'