Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
REPT-179-7 - Incorporate pull request forward3d#48 for forward3d/rbhive
Browse files Browse the repository at this point in the history
  • Loading branch information
John Glorioso committed May 15, 2017
1 parent 468396e commit 05a5d85
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 99 deletions.
44 changes: 27 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ It is capable of using the following Thrift transports:

As of version 1.0, it supports asynchronous execution of queries. This allows you to submit
a query, disconnect, then reconnect later to check the status and retrieve the results.
This frees systems of the need to keep a persistent TCP connection.
This frees systems of the need to keep a persistent TCP connection.

## About Thrift services and transports

Expand All @@ -29,7 +29,7 @@ BufferedTransport.

### Hiveserver2

[Hiveserver2](https://cwiki.apache.org/confluence/display/Hive/Setting+up+HiveServer2)
[Hiveserver2](https://cwiki.apache.org/confluence/display/Hive/Setting+up+HiveServer2)
(the new Thrift interface) can support many concurrent client connections. It is shipped
with Hive 0.10 and later. In Hive 0.10, only BufferedTranport and SaslClientTransport are
supported; starting with Hive 0.12, HTTPClientTransport is also supported.
Expand Down Expand Up @@ -63,7 +63,7 @@ Otherwise you'll get this nasty-looking exception in the logs:
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
at java.lang.Thread.run(Thread.java:662)

### Other Hive-compatible services

Expand All @@ -77,20 +77,20 @@ Since Hiveserver has no options, connection code is very simple:

RBHive.connect('hive.server.address', 10_000) do |connection|
connection.fetch 'SELECT city, country FROM cities'
end
end
➔ [{:city => "London", :country => "UK"}, {:city => "Mumbai", :country => "India"}, {:city => "New York", :country => "USA"}]

### Hiveserver2

Hiveserver2 has several options with how it is run. The connection code takes
a hash with these possible parameters:
* `:transport` - one of `:buffered` (BufferedTransport), `:http` (HTTPClientTransport), or `:sasl` (SaslClientTransport)
* `:hive_version` - the number after the period in the Hive version; e.g. `10`, `11`, `12`, `13` or one of
* `:hive_version` - the number after the period in the Hive version; e.g. `10`, `11`, `12`, `13` or one of
a set of symbols; see [Hiveserver2 protocol versions](#hiveserver2-protocol-versions) below for details
* `:timeout` - if using BufferedTransport or SaslClientTransport, this is how long the timeout on the socket will be
* `:sasl_params` - if using SaslClientTransport, this is a hash of parameters to set up the SASL connection

If you pass either an empty hash or nil in place of the options (or do not supply them), the connection
If you pass either an empty hash or nil in place of the options (or do not supply them), the connection
is attempted with the Hive version set to 0.10, using `:buffered` as the transport, and a timeout of 1800 seconds.

Connecting with the defaults:
Expand All @@ -117,7 +117,17 @@ Connecting with a specific Hive version (0.12) and using the `:http` transport:
connection.fetch('SHOW TABLES')
end

We have not tested the SASL connection, as we don't run SASL; pull requests and testing are welcomed.
Connecting with SASL and Kerberos v5:

RBHive.tcli_connect('hive.hadoop.forward.co.uk', 10_000, {
:transport => :sasl,
:sasl_params => {
:mechanism => 'GSSAPI',
:remote_host => 'example.com',
:remote_principal => 'hive/[email protected]'
) do |connection|
connection.fetch("show tables")
end

#### Hiveserver2 protocol versions

Expand Down Expand Up @@ -204,7 +214,7 @@ one of the following values and meanings:
| :unknown | The query is in an unknown state
| :pending | The query is ready to run but is not running

There are also the utility methods `async_is_complete?(handles)`, `async_is_running?(handles)`,
There are also the utility methods `async_is_complete?(handles)`, `async_is_running?(handles)`,
`async_is_failed?(handles)` and `async_is_cancelled?(handles)`.

#### `async_cancel(handles)`
Expand All @@ -225,14 +235,14 @@ same way as the normal synchronous methods.

RBHive.connect('hive.server.address', 10_000) do |connection|
connection.fetch 'SELECT city, country FROM cities'
end
end
➔ [{:city => "London", :country => "UK"}, {:city => "Mumbai", :country => "India"}, {:city => "New York", :country => "USA"}]

#### Hiveserver2

RBHive.tcli_connect('hive.server.address', 10_000) do |connection|
connection.fetch 'SELECT city, country FROM cities'
end
end
➔ [{:city => "London", :country => "UK"}, {:city => "Mumbai", :country => "India"}, {:city => "New York", :country => "USA"}]

### Executing a query
Expand Down Expand Up @@ -266,13 +276,13 @@ Then for Hiveserver:

RBHive.connect('hive.server.address', 10_000) do |connection|
connection.create_table(table)
end
end

Or Hiveserver2:

RBHive.tcli_connect('hive.server.address', 10_000) do |connection|
connection.create_table(table)
end
end

### Modifying table schema

Expand All @@ -290,18 +300,18 @@ Then for Hiveserver:

RBHive.connect('hive.server.address') do |connection|
connection.replace_columns(table)
end
end

Or Hiveserver2:

RBHive.tcli_connect('hive.server.address') do |connection|
connection.replace_columns(table)
end
end

### Setting properties

You can set various properties for Hive tasks, some of which change how they run. Consult the Apache
Hive documentation and Hadoop's documentation for the various properties that can be set.
Hive documentation and Hadoop's documentation for the various properties that can be set.
For example, you can set the map-reduce job's priority with the following:

connection.set("mapred.job.priority", "VERY_HIGH")
Expand All @@ -310,15 +320,15 @@ For example, you can set the map-reduce job's priority with the following:

#### Hiveserver

RBHive.connect('hive.hadoop.forward.co.uk', 10_000) {|connection|
RBHive.connect('hive.hadoop.forward.co.uk', 10_000) {|connection|
result = connection.fetch("describe some_table")
puts result.column_names.inspect
puts result.first.inspect
}

#### Hiveserver2

RBHive.tcli_connect('hive.hadoop.forward.co.uk', 10_000) {|connection|
RBHive.tcli_connect('hive.hadoop.forward.co.uk', 10_000) {|connection|
result = connection.fetch("describe some_table")
puts result.column_names.inspect
puts result.first.inspect
Expand Down
56 changes: 29 additions & 27 deletions lib/rbhive/t_c_l_i_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def flush
end

module RBHive

HIVE_THRIFT_MAPPING = {
10 => 0,
11 => 1,
Expand Down Expand Up @@ -84,32 +84,32 @@ class TCLIConnection
def initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new)
options ||= {} # backwards compatibility
raise "'options' parameter must be a hash" unless options.is_a?(Hash)

if options[:transport] == :sasl and options[:sasl_params].nil?
raise ":transport is set to :sasl, but no :sasl_params option was supplied"
end

# Defaults to buffered transport, Hive 0.10, 1800 second timeout
options[:transport] ||= :buffered
options[:hive_version] ||= 10
options[:timeout] ||= 1800
@options = options

# Look up the appropriate Thrift protocol version for the supplied Hive version
@thrift_protocol_version = thrift_hive_protocol(options[:hive_version])

@logger = logger
@transport = thrift_transport(server, port)
@protocol = Thrift::BinaryProtocol.new(@transport)
@client = Hive2::Thrift::TCLIService::Client.new(@protocol)
@session = nil
@logger.info("Connecting to HiveServer2 #{server} on port #{port}")
end

def thrift_hive_protocol(version)
HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version")
end

def thrift_transport(server, port)
@logger.info("Initializing transport #{@options[:transport]}")
case @options[:transport]
Expand Down Expand Up @@ -188,7 +188,7 @@ def set(name,value)
@logger.info("Setting #{name}=#{value}")
self.execute("SET #{name}=#{value}")
end

# Async execute
def async_execute(query)
@logger.info("Executing query asynchronously: #{query}")
Expand All @@ -204,35 +204,35 @@ def async_execute(query)

# Return handles to get hold of this query / session again
{
session: @session.sessionHandle,
guid: op_handle.operationId.guid,
session: @session.sessionHandle,
guid: op_handle.operationId.guid,
secret: op_handle.operationId.secret
}
end

# Is the query complete?
def async_is_complete?(handles)
async_state(handles) == :finished
end

# Is the query actually running?
def async_is_running?(handles)
async_state(handles) == :running
end

# Has the query failed?
def async_is_failed?(handles)
async_state(handles) == :error
end

def async_is_cancelled?(handles)
async_state(handles) == :cancelled
end

def async_cancel(handles)
@client.CancelOperation(prepare_cancel_request(handles))
end

# Map states to symbols
def async_state(handles)
response = @client.GetOperationStatus(
Expand Down Expand Up @@ -262,18 +262,18 @@ def async_state(handles)
return :state_not_in_protocol
end
end

# Async fetch results from an async execute
def async_fetch(handles, max_rows = 100)
# Can't get data from an unfinished query
unless async_is_complete?(handles)
raise "Can't perform fetch on a query in state: #{async_state(handles)}"
end

# Fetch and
fetch_rows(prepare_operation_handle(handles), :first, max_rows)
end

# Performs a query on the server, fetches the results in batches of *batch_size* rows
# and yields the result batches to a given block as arrays of rows.
def async_fetch_in_batch(handles, batch_size = 1000, &block)
Expand All @@ -290,12 +290,12 @@ def async_fetch_in_batch(handles, batch_size = 1000, &block)
yield rows
end
end

def async_close_session(handles)
validate_handles!(handles)
@client.CloseSession(Hive2::Thrift::TCloseSessionReq.new( sessionHandle: handles[:session] ))
end

# Pull rows from the query result
def fetch_rows(op_handle, orientation = :first, max_rows = 1000)
fetch_req = prepare_fetch_results(op_handle, orientation, max_rows)
Expand All @@ -304,7 +304,7 @@ def fetch_rows(op_handle, orientation = :first, max_rows = 1000)
rows = fetch_results.results.rows
TCLIResultSet.new(rows, TCLISchemaDefinition.new(get_schema_for(op_handle), rows.first))
end

# Performs a explain on the supplied query on the server, returns it as a ExplainResult.
# (Only works on 0.12 if you have this patch - https://issues.apache.org/jira/browse/HIVE-5492)
def explain(query)
Expand All @@ -323,7 +323,7 @@ def fetch(query, max_rows = 100)

# Get search operation handle to fetch the results
op_handle = exec_result.operationHandle

# Fetch the rows
fetch_rows(op_handle, :first, max_rows)
end
Expand All @@ -332,7 +332,7 @@ def fetch(query, max_rows = 100)
# and yields the result batches to a given block as arrays of rows.
def fetch_in_batch(query, batch_size = 1000, &block)
raise "No block given for the batch fetch request!" unless block_given?

# Execute the query and check the result
exec_result = execute(query)
raise_error_if_failed!(exec_result)
Expand Down Expand Up @@ -375,7 +375,9 @@ def method_missing(meth, *args)
private

def prepare_open_session(client_protocol)
req = ::Hive2::Thrift::TOpenSessionReq.new( @options[:sasl_params].nil? ? [] : @options[:sasl_params] )
req = ::Hive2::Thrift::TOpenSessionReq.new( @options[:sasl_params].nil? ? [] : {
:username => @options[:sasl_params][:username],
:password => @options[:sasl_params][:password]})
req.client_protocol = client_protocol
req
end
Expand Down Expand Up @@ -410,13 +412,13 @@ def prepare_operation_handle(handles)
hasResultSet: false
)
end

def prepare_cancel_request(handles)
Hive2::Thrift::TCancelOperationReq.new(
operationHandle: prepare_operation_handle(handles)
)
end

def validate_handles!(handles)
unless handles.has_key?(:guid) and handles.has_key?(:secret) and handles.has_key?(:session)
raise "Invalid handles hash: #{handles.inspect}"
Expand Down
Loading

0 comments on commit 05a5d85

Please sign in to comment.