From 4123b65122e2dacf09fc8ceb24c52b98aed0b3ad Mon Sep 17 00:00:00 2001 From: Piotr Fus Date: Mon, 28 Aug 2023 07:37:14 +0200 Subject: [PATCH] SNOW-895535: Prioritise entries in query context cache --- htap.go | 43 +++++++++++++- htap_test.go | 158 ++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 196 insertions(+), 5 deletions(-) diff --git a/htap.go b/htap.go index 587f830aa..9d03e377a 100644 --- a/htap.go +++ b/htap.go @@ -1,6 +1,9 @@ package gosnowflake -import "sync" +import ( + "sort" + "sync" +) type queryContextEntry struct { ID int `json:"id"` @@ -22,5 +25,41 @@ func (qcc *queryContextCache) init() *queryContextCache { func (qcc *queryContextCache) add(qces ...queryContextEntry) { qcc.mutex.Lock() defer qcc.mutex.Unlock() - qcc.entries = append(qcc.entries, qces...) + var newQcesBasedOnPriorities []queryContextEntry + for _, newQce := range qces { + logger.Debugf("adding query context: %v", newQce) + newQceProcessed := false + for existingQceIdx, existingQce := range qcc.entries { + if newQce.ID == existingQce.ID { + newQceProcessed = true + if newQce.Timestamp > existingQce.Timestamp { + if newQce.Priority == existingQce.Priority { + qcc.entries[existingQceIdx].Timestamp = newQce.Timestamp + qcc.entries[existingQceIdx].Context = newQce.Context + } else { + qcc.entries[existingQceIdx] = newQce + } + } else if newQce.Timestamp == existingQce.Timestamp { + if newQce.Priority != existingQce.Priority { + qcc.entries[existingQceIdx].Priority = newQce.Priority + qcc.entries[existingQceIdx].Context = newQce.Context + } + } + } else { + if newQce.Priority == existingQce.Priority { + qcc.entries[existingQceIdx].ID = newQce.ID + qcc.entries[existingQceIdx].Timestamp = newQce.Timestamp + qcc.entries[existingQceIdx].Context = newQce.Context + newQceProcessed = true + } + } + } + if !newQceProcessed { + qcc.entries = append(qcc.entries, newQce) + } + } + qcc.entries = append(qcc.entries, newQcesBasedOnPriorities...) + sort.Slice(qcc.entries, func(idx1, idx2 int) bool { + return qcc.entries[idx1].Priority < qcc.entries[idx2].Priority + }) } diff --git a/htap_test.go b/htap_test.go index 39fae90f0..a967910bf 100644 --- a/htap_test.go +++ b/htap_test.go @@ -101,10 +101,161 @@ func trimWhitespaces(s string) string { ) } +func TestSortingByPriority(t *testing.T) { + qcc := (&queryContextCache{}).init() + + qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"} + qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"} + qceC := queryContextEntry{ID: 14, Timestamp: 125, Priority: 6, Context: "c"} + qceD := queryContextEntry{ID: 15, Timestamp: 126, Priority: 8, Context: "d"} + + t.Run("Add to empty cache", func(t *testing.T) { + qcc.add(qceA) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA}) { + t.Fatalf("no entries added to cache. %v", qcc.entries) + } + }) + t.Run("Add another entry with different id, timestamp and priority - greater priority", func(t *testing.T) { + qcc.add(qceB) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) { + t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries) + } + }) + t.Run("Add another entry with different id, timestamp and priority - lesser priority", func(t *testing.T) { + qcc.add(qceC) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceA, qceB}) { + t.Fatalf("entry with lesser priority should be added at the beginninig. %v", qcc.entries) + } + }) + t.Run("Add another entry with different id, timestamp and priority - priority in the middle", func(t *testing.T) { + qcc.add(qceD) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceA, qceD, qceB}) { + t.Fatalf("entry with priority in the middle should be added at the middle. %v", qcc.entries) + } + }) +} + +func TestAddingQcesWithTheSameIdAndLaterTimestamp(t *testing.T) { + qcc := (&queryContextCache{}).init() + + qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"} + qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"} + qceC := queryContextEntry{ID: 12, Timestamp: 125, Priority: 6, Context: "c"} + qceD := queryContextEntry{ID: 12, Timestamp: 126, Priority: 6, Context: "d"} + + t.Run("Add to empty cache", func(t *testing.T) { + qcc.add(qceA) + qcc.add(qceB) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) { + t.Fatalf("no entries added to cache. %v", qcc.entries) + } + }) + t.Run("Add another entry with different priority", func(t *testing.T) { + qcc.add(qceC) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceB}) { + t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries) + } + }) + t.Run("Add another entry with same priority", func(t *testing.T) { + qcc.add(qceD) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceD, qceB}) { + t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries) + } + }) +} + +func TestAddingQcesWithTheSameIdAndSameTimestamp(t *testing.T) { + qcc := (&queryContextCache{}).init() + + qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"} + qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"} + qceC := queryContextEntry{ID: 12, Timestamp: 123, Priority: 6, Context: "c"} + qceD := queryContextEntry{ID: 12, Timestamp: 123, Priority: 6, Context: "d"} + + t.Run("Add to empty cache", func(t *testing.T) { + qcc.add(qceA) + qcc.add(qceB) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) { + t.Fatalf("no entries added to cache. %v", qcc.entries) + } + }) + t.Run("Add another entry with different priority", func(t *testing.T) { + qcc.add(qceC) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceB}) { + t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries) + } + }) + t.Run("Add another entry with same priority", func(t *testing.T) { // TODO should anything happen here? + qcc.add(qceD) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceB}) { + t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries) + } + }) +} + +func TestAddingQcesWithTheSameIdAndEarlierTimestamp(t *testing.T) { + qcc := (&queryContextCache{}).init() + + qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"} + qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"} + qceC := queryContextEntry{ID: 12, Timestamp: 122, Priority: 6, Context: "c"} + qceD := queryContextEntry{ID: 12, Timestamp: 122, Priority: 7, Context: "d"} + + t.Run("Add to empty cache", func(t *testing.T) { + qcc.add(qceA) + qcc.add(qceB) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) { + t.Fatalf("no entries added to cache. %v", qcc.entries) + } + }) + t.Run("Add another entry with different priority", func(t *testing.T) { + qcc.add(qceC) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) { + t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries) + } + }) + t.Run("Add another entry with same priority", func(t *testing.T) { // TODO should anything happen here? + qcc.add(qceD) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) { + t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries) + } + }) +} + +func TestAddingQcesWithDifferentId(t *testing.T) { + qcc := (&queryContextCache{}).init() + + qceA := queryContextEntry{ID: 12, Timestamp: 123, Priority: 7, Context: "a"} + qceB := queryContextEntry{ID: 13, Timestamp: 124, Priority: 9, Context: "b"} + qceC := queryContextEntry{ID: 14, Timestamp: 122, Priority: 7, Context: "c"} + qceD := queryContextEntry{ID: 15, Timestamp: 122, Priority: 6, Context: "d"} + + t.Run("Add to empty cache", func(t *testing.T) { + qcc.add(qceA) + qcc.add(qceB) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceA, qceB}) { + t.Fatalf("no entries added to cache. %v", qcc.entries) + } + }) + t.Run("Add another entry with same priority", func(t *testing.T) { + qcc.add(qceC) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceC, qceB}) { + t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries) + } + }) + t.Run("Add another entry with different priority", func(t *testing.T) { + qcc.add(qceD) + if !reflect.DeepEqual(qcc.entries, []queryContextEntry{qceD, qceC, qceB}) { + t.Fatalf("entry with greater priority should be added at the end. %v", qcc.entries) + } + }) +} + func TestAddingQueryContextCacheEntry(t *testing.T) { runSnowflakeConnTest(t, func(sc *snowflakeConn) { t.Run("First query (may be on empty cache)", func(t *testing.T) { - entriesBefore := sc.queryContextCache.entries + entriesBefore := make([]queryContextEntry, len(sc.queryContextCache.entries)) + copy(entriesBefore, sc.queryContextCache.entries) if _, err := sc.Query("SELECT 1", nil); err != nil { t.Fatalf("cannot query. %v", err) } @@ -116,11 +267,12 @@ func TestAddingQueryContextCacheEntry(t *testing.T) { }) t.Run("Second query (cache should not be empty)", func(t *testing.T) { - entriesBefore := sc.queryContextCache.entries + entriesBefore := make([]queryContextEntry, len(sc.queryContextCache.entries)) + copy(entriesBefore, sc.queryContextCache.entries) if len(entriesBefore) == 0 { t.Fatalf("cache should not be empty after first query") } - if _, err := sc.Query("SELECT 1", nil); err != nil { + if _, err := sc.Query("SELECT 2", nil); err != nil { t.Fatalf("cannot query. %v", err) } entriesAfter := sc.queryContextCache.entries