Skip to content

Commit

Permalink
SNOW-895535: Prioritise entries in query context cache
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-pfus committed Aug 28, 2023
1 parent 461b653 commit 4123b65
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 5 deletions.
43 changes: 41 additions & 2 deletions htap.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gosnowflake

import "sync"
import (
"sort"
"sync"
)

type queryContextEntry struct {
ID int `json:"id"`
Expand All @@ -22,5 +25,41 @@ func (qcc *queryContextCache) init() *queryContextCache {
func (qcc *queryContextCache) add(qces ...queryContextEntry) {
qcc.mutex.Lock()
defer qcc.mutex.Unlock()
qcc.entries = append(qcc.entries, qces...)
var newQcesBasedOnPriorities []queryContextEntry
for _, newQce := range qces {
logger.Debugf("adding query context: %v", newQce)
newQceProcessed := false
for existingQceIdx, existingQce := range qcc.entries {
if newQce.ID == existingQce.ID {
newQceProcessed = true
if newQce.Timestamp > existingQce.Timestamp {
if newQce.Priority == existingQce.Priority {
qcc.entries[existingQceIdx].Timestamp = newQce.Timestamp
qcc.entries[existingQceIdx].Context = newQce.Context
} else {
qcc.entries[existingQceIdx] = newQce
}
} else if newQce.Timestamp == existingQce.Timestamp {
if newQce.Priority != existingQce.Priority {
qcc.entries[existingQceIdx].Priority = newQce.Priority
qcc.entries[existingQceIdx].Context = newQce.Context
}
}
} else {
if newQce.Priority == existingQce.Priority {
qcc.entries[existingQceIdx].ID = newQce.ID
qcc.entries[existingQceIdx].Timestamp = newQce.Timestamp
qcc.entries[existingQceIdx].Context = newQce.Context
newQceProcessed = true
}
}
}
if !newQceProcessed {
qcc.entries = append(qcc.entries, newQce)
}
}
qcc.entries = append(qcc.entries, newQcesBasedOnPriorities...)
sort.Slice(qcc.entries, func(idx1, idx2 int) bool {
return qcc.entries[idx1].Priority < qcc.entries[idx2].Priority
})
}
158 changes: 155 additions & 3 deletions htap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,161 @@ func trimWhitespaces(s string) string {
)
}

func TestSortingByPriority(t *testing.T) {
qcc := (&queryContextCache{}).init()

qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"}
qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"}
qceC := queryContextEntry{ID: 14, Timestamp: 125, Priority: 6, Context: "c"}
qceD := queryContextEntry{ID: 15, Timestamp: 126, Priority: 8, Context: "d"}

t.Run("Add to empty cache", func(t *testing.T) {
qcc.add(qceA)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA}) {
t.Fatalf("no entries added to cache. %v", qcc.entries)
}
})
t.Run("Add another entry with different id, timestamp and priority - greater priority", func(t *testing.T) {
qcc.add(qceB)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) {
t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries)
}
})
t.Run("Add another entry with different id, timestamp and priority - lesser priority", func(t *testing.T) {
qcc.add(qceC)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceA, qceB}) {
t.Fatalf("entry with lesser priority should be added at the beginninig. %v", qcc.entries)
}
})
t.Run("Add another entry with different id, timestamp and priority - priority in the middle", func(t *testing.T) {
qcc.add(qceD)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceA, qceD, qceB}) {
t.Fatalf("entry with priority in the middle should be added at the middle. %v", qcc.entries)
}
})
}

func TestAddingQcesWithTheSameIdAndLaterTimestamp(t *testing.T) {
qcc := (&queryContextCache{}).init()

qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"}
qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"}
qceC := queryContextEntry{ID: 12, Timestamp: 125, Priority: 6, Context: "c"}
qceD := queryContextEntry{ID: 12, Timestamp: 126, Priority: 6, Context: "d"}

t.Run("Add to empty cache", func(t *testing.T) {
qcc.add(qceA)
qcc.add(qceB)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) {
t.Fatalf("no entries added to cache. %v", qcc.entries)
}
})
t.Run("Add another entry with different priority", func(t *testing.T) {
qcc.add(qceC)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceB}) {
t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries)
}
})
t.Run("Add another entry with same priority", func(t *testing.T) {
qcc.add(qceD)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceD, qceB}) {
t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries)
}
})
}

func TestAddingQcesWithTheSameIdAndSameTimestamp(t *testing.T) {
qcc := (&queryContextCache{}).init()

qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"}
qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"}
qceC := queryContextEntry{ID: 12, Timestamp: 123, Priority: 6, Context: "c"}
qceD := queryContextEntry{ID: 12, Timestamp: 123, Priority: 6, Context: "d"}

t.Run("Add to empty cache", func(t *testing.T) {
qcc.add(qceA)
qcc.add(qceB)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) {
t.Fatalf("no entries added to cache. %v", qcc.entries)
}
})
t.Run("Add another entry with different priority", func(t *testing.T) {
qcc.add(qceC)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceB}) {
t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries)
}
})
t.Run("Add another entry with same priority", func(t *testing.T) { // TODO should anything happen here?
qcc.add(qceD)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceB}) {
t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries)
}
})
}

func TestAddingQcesWithTheSameIdAndEarlierTimestamp(t *testing.T) {
qcc := (&queryContextCache{}).init()

qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"}
qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"}
qceC := queryContextEntry{ID: 12, Timestamp: 122, Priority: 6, Context: "c"}
qceD := queryContextEntry{ID: 12, Timestamp: 122, Priority: 7, Context: "d"}

t.Run("Add to empty cache", func(t *testing.T) {
qcc.add(qceA)
qcc.add(qceB)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) {
t.Fatalf("no entries added to cache. %v", qcc.entries)
}
})
t.Run("Add another entry with different priority", func(t *testing.T) {
qcc.add(qceC)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) {
t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries)
}
})
t.Run("Add another entry with same priority", func(t *testing.T) { // TODO should anything happen here?
qcc.add(qceD)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) {
t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries)
}
})
}

func TestAddingQcesWithDifferentId(t *testing.T) {
qcc := (&queryContextCache{}).init()

qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"}
qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"}
qceC := queryContextEntry{ID: 14, Timestamp: 122, Priority: 7, Context: "c"}
qceD := queryContextEntry{ID: 15, Timestamp: 122, Priority: 6, Context: "d"}

t.Run("Add to empty cache", func(t *testing.T) {
qcc.add(qceA)
qcc.add(qceB)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) {
t.Fatalf("no entries added to cache. %v", qcc.entries)
}
})
t.Run("Add another entry with same priority", func(t *testing.T) {
qcc.add(qceC)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceB}) {
t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries)
}
})
t.Run("Add another entry with different priority", func(t *testing.T) {
qcc.add(qceD)
if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceD, qceC, qceB}) {
t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries)
}
})
}

func TestAddingQueryContextCacheEntry(t *testing.T) {
runSnowflakeConnTest(t, func(sc *snowflakeConn) {
t.Run("First query (may be on empty cache)", func(t *testing.T) {
entriesBefore := sc.queryContextCache.entries
entriesBefore := make([]queryContextEntry, len(sc.queryContextCache.entries))
copy(entriesBefore, sc.queryContextCache.entries)
if _, err := sc.Query("SELECT 1", nil); err != nil {
t.Fatalf("cannot query. %v", err)
}
Expand All @@ -116,11 +267,12 @@ func TestAddingQueryContextCacheEntry(t *testing.T) {
})

t.Run("Second query (cache should not be empty)", func(t *testing.T) {
entriesBefore := sc.queryContextCache.entries
entriesBefore := make([]queryContextEntry, len(sc.queryContextCache.entries))
copy(entriesBefore, sc.queryContextCache.entries)
if len(entriesBefore) == 0 {
t.Fatalf("cache should not be empty after first query")
}
if _, err := sc.Query("SELECT 1", nil); err != nil {
if _, err := sc.Query("SELECT 2", nil); err != nil {
t.Fatalf("cannot query. %v", err)
}
entriesAfter := sc.queryContextCache.entries
Expand Down

0 comments on commit 4123b65

Please sign in to comment.