Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snow 895534 add entries to cache #889

Merged
merged 1 commit into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (sr *snowflakeRestful) getAsync(

}

sc := &snowflakeConn{rest: sr, cfg: cfg}
sc := &snowflakeConn{rest: sr, cfg: cfg, queryContextCache: (&queryContextCache{}).init()}
if respd.Success {
if resType == execResultType {
res.insertID = -1
Expand Down
22 changes: 13 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ const (
const privateLinkSuffix = "privatelink.snowflakecomputing.com"

type snowflakeConn struct {
ctx context.Context
cfg *Config
rest *snowflakeRestful
SequenceCounter uint64
telemetry *snowflakeTelemetry
internal InternalClient
ctx context.Context
cfg *Config
rest *snowflakeRestful
SequenceCounter uint64
telemetry *snowflakeTelemetry
internal InternalClient
queryContextCache *queryContextCache
}

var (
Expand Down Expand Up @@ -143,6 +144,8 @@ func (sc *snowflakeConn) exec(
return nil, err
}

sc.queryContextCache.add(data.Data.QueryContext.Entries...)

// handle PUT/GET commands
if isFileTransfer(query) {
data, err = sc.processFileTransfer(ctx, data, query, isInternal)
Expand Down Expand Up @@ -679,9 +682,10 @@ func (scd *snowflakeArrowStreamChunkDownloader) GetBatches() (out []ArrowStreamB

func buildSnowflakeConn(ctx context.Context, config Config) (*snowflakeConn, error) {
sc := &snowflakeConn{
SequenceCounter: 0,
ctx: ctx,
cfg: &config,
SequenceCounter: 0,
ctx: ctx,
cfg: &config,
queryContextCache: (&queryContextCache{}).init(),
}
var st http.RoundTripper = SnowflakeTransport
if sc.cfg.Transporter == nil {
Expand Down
22 changes: 13 additions & 9 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func TestExecWithEmptyRequestID(t *testing.T) {
}

sc := &snowflakeConn{
cfg: &Config{Params: map[string]*string{}},
rest: sr,
cfg: &Config{Params: map[string]*string{}},
rest: sr,
queryContextCache: (&queryContextCache{}).init(),
}
if _, err := sc.exec(ctx, "", false /* noResult */, false, /* isInternal */
false /* describeOnly */, nil); err != nil {
Expand Down Expand Up @@ -161,8 +162,9 @@ func TestExecWithSpecificRequestID(t *testing.T) {
}

sc := &snowflakeConn{
cfg: &Config{Params: map[string]*string{}},
rest: sr,
cfg: &Config{Params: map[string]*string{}},
rest: sr,
queryContextCache: (&queryContextCache{}).init(),
}
if _, err := sc.exec(ctx, "", false /* noResult */, false, /* isInternal */
false /* describeOnly */, nil); err != nil {
Expand All @@ -181,8 +183,9 @@ func TestServiceName(t *testing.T) {
}

sc := &snowflakeConn{
cfg: &Config{Params: map[string]*string{}},
rest: sr,
cfg: &Config{Params: map[string]*string{}},
rest: sr,
queryContextCache: (&queryContextCache{}).init(),
}

expectServiceName := serviceNameStub
Expand Down Expand Up @@ -219,9 +222,10 @@ func TestCloseIgnoreSessionGone(t *testing.T) {
FuncCloseSession: closeSessionMock,
}
sc := &snowflakeConn{
cfg: &Config{Params: map[string]*string{}},
rest: sr,
telemetry: testTelemetry,
cfg: &Config{Params: map[string]*string{}},
rest: sr,
telemetry: testTelemetry,
queryContextCache: (&queryContextCache{}).init(),
}

if sc.Close() != nil {
Expand Down
2 changes: 1 addition & 1 deletion connection_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (sc *snowflakeConn) stopHeartBeat() {
if sc.cfg != nil && !sc.isClientSessionKeepAliveEnabled() {
return
}
if sc.rest != nil {
if sc.rest != nil && sc.rest.HeartBeat != nil {
sc.rest.HeartBeat.stop()
}
}
Expand Down
17 changes: 17 additions & 0 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,23 @@ func runDBTest(t *testing.T, test func(dbt *DBTest)) {
test(dbt)
}

func runSnowflakeConnTest(t *testing.T, test func(sc *snowflakeConn)) {
config, err := ParseDSN(dsn)
if err != nil {
t.Error(err)
}
sc, err := buildSnowflakeConn(context.Background(), *config)
if err != nil {
t.Fatal(err)
}
defer sc.Close()
if err = authenticateWithConfig(sc); err != nil {
t.Fatal(err)
}

test(sc)
}

func runningOnAWS() bool {
return os.Getenv("CLOUD_PROVIDER") == "AWS"
}
Expand Down
18 changes: 18 additions & 0 deletions htap.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
package gosnowflake

import "sync"

type queryContextEntry struct {
ID int `json:"id"`
Timestamp int64 `json:"timestamp"`
Priority int `json:"priority"`
Context any `json:"context,omitempty"`
}

type queryContextCache struct {
mutex *sync.Mutex
entries []queryContextEntry
sfc-gh-pfus marked this conversation as resolved.
Show resolved Hide resolved
}

func (qcc *queryContextCache) init() *queryContextCache {
qcc.mutex = &sync.Mutex{}
return qcc
}

func (qcc *queryContextCache) add(qces ...queryContextEntry) {
qcc.mutex.Lock()
defer qcc.mutex.Unlock()
qcc.entries = append(qcc.entries, qces...)
}
47 changes: 47 additions & 0 deletions htap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,50 @@ func trimWhitespaces(s string) string {
"\n", "",
)
}

func TestAddingQueryContextCacheEntry(t *testing.T) {
runSnowflakeConnTest(t, func(sc *snowflakeConn) {
t.Run("First query (may be on empty cache)", func(t *testing.T) {
entriesBefore := sc.queryContextCache.entries
if _, err := sc.Query("SELECT 1", nil); err != nil {
t.Fatalf("cannot query. %v", err)
}
entriesAfter := sc.queryContextCache.entries

if !containsNewEntries(entriesAfter, entriesBefore) {
t.Error("no new entries added to the query context cache")
}
})

t.Run("Second query (cache should not be empty)", func(t *testing.T) {
entriesBefore := sc.queryContextCache.entries
if len(entriesBefore) == 0 {
t.Fatalf("cache should not be empty after first query")
}
if _, err := sc.Query("SELECT 1", nil); err != nil {
t.Fatalf("cannot query. %v", err)
}
entriesAfter := sc.queryContextCache.entries

if !containsNewEntries(entriesAfter, entriesBefore) {
t.Error("no new entries added to the query context cache")
}
})
})
}

func containsNewEntries(entriesAfter []queryContextEntry, entriesBefore []queryContextEntry) bool {
if len(entriesAfter) > len(entriesBefore) {
return true
}

for _, entryAfter := range entriesAfter {
for _, entryBefore := range entriesBefore {
if !reflect.DeepEqual(entryBefore, entryAfter) {
return true
}
}
}

return false
}
5 changes: 5 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ type execResponseData struct {
Command string `json:"command,omitempty"`
Kind string `json:"kind,omitempty"`
Operation string `json:"operation,omitempty"`

// HTAP
QueryContext struct {
Entries []queryContextEntry `json:"entries,omitempty"`
} `json:"queryContext,omitempty"`
}

type execResponse struct {
Expand Down
Loading