Skip to content

Commit

Permalink
SNOW-895534: Add HTAP query context entries to cache
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-pfus committed Aug 25, 2023
1 parent 190c9ca commit 10f8fa6
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 20 deletions.
2 changes: 1 addition & 1 deletion async.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (sr *snowflakeRestful) getAsync(

}

sc := &snowflakeConn{rest: sr, cfg: cfg}
sc := &snowflakeConn{rest: sr, cfg: cfg, queryContextCache: (&queryContextCache{}).init()}
if respd.Success {
if resType == execResultType {
res.insertID = -1
Expand Down
22 changes: 13 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ const (
const privateLinkSuffix = "privatelink.snowflakecomputing.com"

type snowflakeConn struct {
ctx context.Context
cfg *Config
rest *snowflakeRestful
SequenceCounter uint64
telemetry *snowflakeTelemetry
internal InternalClient
ctx context.Context
cfg *Config
rest *snowflakeRestful
SequenceCounter uint64
telemetry *snowflakeTelemetry
internal InternalClient
queryContextCache *queryContextCache
}

var (
Expand Down Expand Up @@ -143,6 +144,8 @@ func (sc *snowflakeConn) exec(
return nil, err
}

sc.queryContextCache.addAll(data.Data.QueryContext.Entries)

// handle PUT/GET commands
if isFileTransfer(query) {
data, err = sc.processFileTransfer(ctx, data, query, isInternal)
Expand Down Expand Up @@ -679,9 +682,10 @@ func (scd *snowflakeArrowStreamChunkDownloader) GetBatches() (out []ArrowStreamB

func buildSnowflakeConn(ctx context.Context, config Config) (*snowflakeConn, error) {
sc := &snowflakeConn{
SequenceCounter: 0,
ctx: ctx,
cfg: &config,
SequenceCounter: 0,
ctx: ctx,
cfg: &config,
queryContextCache: (&queryContextCache{}).init(),
}
var st http.RoundTripper = SnowflakeTransport
if sc.cfg.Transporter == nil {
Expand Down
22 changes: 13 additions & 9 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func TestExecWithEmptyRequestID(t *testing.T) {
}

sc := &snowflakeConn{
cfg: &Config{Params: map[string]*string{}},
rest: sr,
cfg: &Config{Params: map[string]*string{}},
rest: sr,
queryContextCache: (&queryContextCache{}).init(),
}
if _, err := sc.exec(ctx, "", false /* noResult */, false, /* isInternal */
false /* describeOnly */, nil); err != nil {
Expand Down Expand Up @@ -161,8 +162,9 @@ func TestExecWithSpecificRequestID(t *testing.T) {
}

sc := &snowflakeConn{
cfg: &Config{Params: map[string]*string{}},
rest: sr,
cfg: &Config{Params: map[string]*string{}},
rest: sr,
queryContextCache: (&queryContextCache{}).init(),
}
if _, err := sc.exec(ctx, "", false /* noResult */, false, /* isInternal */
false /* describeOnly */, nil); err != nil {
Expand All @@ -181,8 +183,9 @@ func TestServiceName(t *testing.T) {
}

sc := &snowflakeConn{
cfg: &Config{Params: map[string]*string{}},
rest: sr,
cfg: &Config{Params: map[string]*string{}},
rest: sr,
queryContextCache: (&queryContextCache{}).init(),
}

expectServiceName := serviceNameStub
Expand Down Expand Up @@ -219,9 +222,10 @@ func TestCloseIgnoreSessionGone(t *testing.T) {
FuncCloseSession: closeSessionMock,
}
sc := &snowflakeConn{
cfg: &Config{Params: map[string]*string{}},
rest: sr,
telemetry: testTelemetry,
cfg: &Config{Params: map[string]*string{}},
rest: sr,
telemetry: testTelemetry,
queryContextCache: (&queryContextCache{}).init(),
}

if sc.Close() != nil {
Expand Down
2 changes: 1 addition & 1 deletion connection_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (sc *snowflakeConn) stopHeartBeat() {
if sc.cfg != nil && !sc.isClientSessionKeepAliveEnabled() {
return
}
if sc.rest != nil {
if sc.rest != nil && sc.rest.HeartBeat != nil {
sc.rest.HeartBeat.stop()
}
}
Expand Down
17 changes: 17 additions & 0 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,23 @@ func runDBTest(t *testing.T, test func(dbt *DBTest)) {
test(dbt)
}

func runSnowflakeConnTest(t *testing.T, test func(sc *snowflakeConn)) {
config, err := ParseDSN(dsn)
if err != nil {
t.Error(err)
}
sc, err := buildSnowflakeConn(context.Background(), *config)
if err != nil {
t.Fatal(err)
}
defer sc.Close()
if err = authenticateWithConfig(sc); err != nil {
t.Fatal(err)
}

test(sc)
}

func runningOnAWS() bool {
return os.Getenv("CLOUD_PROVIDER") == "AWS"
}
Expand Down
18 changes: 18 additions & 0 deletions htap.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
package gosnowflake

import "sync"

type queryContextEntry struct {
ID int `json:"id"`
Timestamp int64 `json:"timestamp"`
Priority int `json:"priority"`
Context any `json:"context,omitempty"`
}

type queryContextCache struct {
mutex *sync.Mutex
entries []queryContextEntry
}

func (qcc *queryContextCache) init() *queryContextCache {
qcc.mutex = &sync.Mutex{}
return qcc
}

func (qcc *queryContextCache) addAll(qces []queryContextEntry) {
qcc.mutex.Lock()
defer qcc.mutex.Unlock()
qcc.entries = append(qcc.entries, qces...)
}
47 changes: 47 additions & 0 deletions htap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,50 @@ func trimWhitespaces(s string) string {
"\n", "",
)
}

func TestAddingQueryContextCacheEntry(t *testing.T) {
runSnowflakeConnTest(t, func(sc *snowflakeConn) {
t.Run("First query (may be on empty cache)", func(t *testing.T) {
entriesBefore := sc.queryContextCache.entries
if _, err := sc.Query("SELECT 1", nil); err != nil {
t.Fatalf("cannot query. %v", err)
}
entriesAfter := sc.queryContextCache.entries

if !containsNewEntries(entriesAfter, entriesBefore) {
t.Error("no new entries added to the query context cache")
}
})

t.Run("Second query (cache should not be empty)", func(t *testing.T) {
entriesBefore := sc.queryContextCache.entries
if len(entriesBefore) == 0 {
t.Fatalf("cache should not be empty after first query")
}
if _, err := sc.Query("SELECT 1", nil); err != nil {
t.Fatalf("cannot query. %v", err)
}
entriesAfter := sc.queryContextCache.entries

if !containsNewEntries(entriesAfter, entriesBefore) {
t.Error("no new entries added to the query context cache")
}
})
})
}

func containsNewEntries(entriesAfter []queryContextEntry, entriesBefore []queryContextEntry) bool {
if len(entriesAfter) > len(entriesBefore) {
return true
}

for _, entryAfter := range entriesAfter {
for _, entryBefore := range entriesBefore {
if !reflect.DeepEqual(entryBefore, entryAfter) {
return true
}
}
}

return false
}
5 changes: 5 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ type execResponseData struct {
Command string `json:"command,omitempty"`
Kind string `json:"kind,omitempty"`
Operation string `json:"operation,omitempty"`

// HTAP
QueryContext struct {
Entries []queryContextEntry `json:"entries,omitempty"`
} `json:"queryContext,omitempty"`
}

type execResponse struct {
Expand Down

0 comments on commit 10f8fa6

Please sign in to comment.