Skip to content

Commit

Permalink
Merge pull request #133 from McIntireEvan/master
Browse files Browse the repository at this point in the history
Add FetchLogs function to return Hive Execution logs
  • Loading branch information
beltran authored Feb 25, 2021
2 parents 90fad11 + f303e2b commit c4da3c8
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 7 deletions.
40 changes: 34 additions & 6 deletions hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"encoding/base64"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
Expand All @@ -13,7 +14,6 @@ import (
"strings"
"sync"
"time"
"log"

"github.com/apache/thrift/lib/go/thrift"
"github.com/beltran/gohive/hiveserver"
Expand Down Expand Up @@ -511,9 +511,9 @@ func (c *Cursor) executeAsync(ctx context.Context, query string) {
}

// Poll returns the current status of the last operation
func (c *Cursor) Poll(getProgres bool) (status *hiveserver.TGetOperationStatusResp) {
func (c *Cursor) Poll(getProgress bool) (status *hiveserver.TGetOperationStatusResp) {
c.Err = nil
progressGet := getProgres
progressGet := getProgress
pollRequest := hiveserver.NewTGetOperationStatusReq()
pollRequest.OperationHandle = c.operationHandle
pollRequest.GetProgressUpdate = &progressGet
Expand All @@ -530,6 +530,34 @@ func (c *Cursor) Poll(getProgres bool) (status *hiveserver.TGetOperationStatusRe
return responsePoll
}

// FetchLogs returns all the Hive execution logs for the latest query up to the current point
func (c *Cursor) FetchLogs() []string {
logRequest := hiveserver.NewTFetchResultsReq()
logRequest.OperationHandle = c.operationHandle
logRequest.Orientation = hiveserver.TFetchOrientation_FETCH_NEXT
logRequest.MaxRows = c.conn.configuration.FetchSize
// FetchType 1 is "logs"
logRequest.FetchType = 1

resp, err := c.conn.client.FetchResults(context.Background(), logRequest)
if err != nil {
return nil
}

// resp contains 1 row, with a column for each line in the log
cols := resp.Results.GetColumns()
var logs []string

for i := 0; i < len(cols); i++ {
col := cols[i].StringVal.Values
for j := 0; j < len(col); j++ {
logs = append(logs, col[j])
}
}

return logs
}

// Finished returns true if the last async operation has finished
func (c *Cursor) Finished() bool {
operationStatus := c.Poll(true)
Expand Down Expand Up @@ -899,15 +927,15 @@ func (c *Cursor) Description() [][]string {
return m
}

// HasMore returns weather more rows can be fetched from the server
// HasMore returns whether more rows can be fetched from the server
func (c *Cursor) HasMore(ctx context.Context) bool {
c.Err = nil
if c.response == nil && c.state != _FINISHED {
c.Err = c.pollUntilData(ctx, 1)
return c.state != _FINISHED || c.totalRows != c.columnIndex
}
// *c.response.HasMoreRows is always false
// so it can be checked and another roundtrip has to be done if etra data has been added
// so it can be checked and another roundtrip has to be done if extra data has been added
if c.totalRows == c.columnIndex && c.state != _FINISHED {
c.Err = c.pollUntilData(ctx, 1)
}
Expand Down Expand Up @@ -1003,7 +1031,7 @@ func (c *Cursor) Cancel() {
return
}

// Close close the cursor
// Close closes the cursor
func (c *Cursor) Close() {
c.Err = c.resetState()
}
Expand Down
20 changes: 19 additions & 1 deletion hive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,25 @@ func TestFetchContext(t *testing.T) {
closeAll(t, connection, cursor)
}

func TestFetchLogs(t *testing.T) {
connection, cursor := prepareTable(t, 2, 1000)
cursor.Execute(context.Background(), "SELECT * FROM pokes", false)
if cursor.Error() != nil {
t.Fatal(cursor.Error())
}

logs := cursor.FetchLogs()
if logs == nil {
t.Fatal("Logs should not be nil")
}

if len(logs) == 0 {
t.Fatal("Logs should non-empty")
}

closeAll(t, connection, cursor)
}

func TestHasMoreContext(t *testing.T) {
connection, cursor := prepareTable(t, 2, 1)
cursor.Execute(context.Background(), "SELECT * FROM pokes", false)
Expand Down Expand Up @@ -691,7 +710,6 @@ func TestRowMap(t *testing.T) {
closeAll(t, connection, cursor)
}


func TestRowMapColumnRename(t *testing.T) {
connection, cursor := makeConnection(t, 1000)
cursor.Exec(context.Background(), "create table if not exists t(a int, b int)")
Expand Down

0 comments on commit c4da3c8

Please sign in to comment.