From 89a5180e76e05dac9e77fc93f22eac095fb70eaf Mon Sep 17 00:00:00 2001 From: Krzysztof Kiewicz Date: Fri, 20 Dec 2024 17:20:46 +0100 Subject: [PATCH] Fix all tests --- quesma/persistence/persistence_test.go | 5 +- .../async_search_storage/in_elasticsearch.go | 9 +- .../async_search_storage/storage_test.go | 112 ++++++++++-------- quesma/quesma/dual_write_proxy_v2.go | 2 +- 4 files changed, 71 insertions(+), 57 deletions(-) diff --git a/quesma/persistence/persistence_test.go b/quesma/persistence/persistence_test.go index 527c68a67..4518c5a17 100644 --- a/quesma/persistence/persistence_test.go +++ b/quesma/persistence/persistence_test.go @@ -86,7 +86,7 @@ func TestNewElasticPersistence(t *testing.T) { } func TestJSONDatabaseWithEviction_noEviction(t *testing.T) { - t.Skip("passes locally, but requires elasticsearch to be running, so skipping") + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") logger.InitSimpleLoggerForTests() indexName := fmt.Sprintf("quesma_test_%d", time.Now().UnixMilli()) fmt.Println("indexName:", indexName) @@ -155,7 +155,7 @@ func TestJSONDatabaseWithEviction_noEviction(t *testing.T) { } func TestJSONDatabaseWithEviction_withEviction(t *testing.T) { - t.Skip("passes locally, but requires elasticsearch to be running, so skipping") + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") logger.InitSimpleLoggerForTests() indexName := fmt.Sprintf("quesma_test_%d", time.Now().UnixMilli()) @@ -213,7 +213,6 @@ func TestJSONDatabaseWithEviction_withEviction(t *testing.T) { assert.Equal(t, 2, docCount) err = db.Put(docs[4]) - fmt.Println("put", docs[4].SizeInBytesTotal, err) assert.NoError(t, err) time.Sleep(elasticUpdateTime) diff --git a/quesma/quesma/async_search_storage/in_elasticsearch.go b/quesma/quesma/async_search_storage/in_elasticsearch.go index b8d489bf8..62d53cb2b 100644 --- a/quesma/quesma/async_search_storage/in_elasticsearch.go +++ b/quesma/quesma/async_search_storage/in_elasticsearch.go @@ -27,11 +27,10 @@ func NewAsyncRequestResultStorageInElasticsearch(cfg config.ElasticsearchConfigu User: "", Password: "", } - i := rand.Int() - fmt.Println("kk dbg NewAsyncRequestResultStorageInElasticsearch() i:", cfg) - return AsyncRequestResultStorageInElasticsearch{ - db: persistence.NewElasticDatabaseWithEviction(cfg, "quesma_async_storage-"+strconv.Itoa(i), 1_000_000_000), - } + fmt.Println("kk dbg NewAsyncRequestResultStorageInElasticsearch() i:", cfg) + return AsyncRequestResultStorageInElasticsearch{ + db: persistence.NewElasticDatabaseWithEviction(cfg, "quesma_async_storage-"+strconv.Itoa(i), 1_000_000_000), + } */ return AsyncRequestResultStorageInElasticsearch{ db: persistence.NewElasticDatabaseWithEviction(cfg, defaultElasticDbName, defaultElasticDbStorageLimitInBytes), diff --git a/quesma/quesma/async_search_storage/storage_test.go b/quesma/quesma/async_search_storage/storage_test.go index b17e19e53..4af98fbc4 100644 --- a/quesma/quesma/async_search_storage/storage_test.go +++ b/quesma/quesma/async_search_storage/storage_test.go @@ -15,59 +15,73 @@ import ( ) func TestAsyncQueriesEvictorTimePassed(t *testing.T) { - // TODO: add also 3rd storage and nice test for it (remove from memory, but still in elastic) - //realUrl, _ := url.Parse("http://localhost:9201") - //cfgUrl := config.Url(*realUrl) - //cfg := config.ElasticsearchConfiguration{Url: &cfgUrl} storageKinds := []AsyncRequestResultStorage{ NewAsyncRequestResultStorageInMemory(), - //NewAsyncRequestResultStorageInElasticsearch(cfg), // passes, reskip after merge - //NewAsyncSearchStorageInMemoryFallbackElastic(cfg), // passes, reskip after merge + NewAsyncRequestResultStorageInElasticsearch(testConfig()), + NewAsyncSearchStorageInMemoryFallbackElastic(testConfig()), } for _, storage := range storageKinds { - queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory) - queryContextStorage.idToContext.Store("1", &AsyncQueryContext{}) - evictor := NewAsyncQueriesEvictor(storage, queryContextStorage) - evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now()}) - evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now()}) - evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now()}) - - time.Sleep(2 * time.Second) - evictor.tryEvictAsyncRequests(1 * time.Second) - time.Sleep(2 * time.Second) - - assert.Equal(t, 0, evictor.AsyncRequestStorage.DocCount()) + t.Run(fmt.Sprintf("storage %T", storage), func(t *testing.T) { + _, inMemory := storage.(AsyncRequestResultStorageInMemory) + if !inMemory { + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") + } + + queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory) + queryContextStorage.idToContext.Store("1", &AsyncQueryContext{}) + evictor := NewAsyncQueriesEvictor(storage, queryContextStorage) + evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now().Add(-2 * time.Second)}) + evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now().Add(-5 * time.Second)}) + evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now().Add(2 * time.Second)}) + + if !inMemory { + time.Sleep(2 * time.Second) + } + evictor.tryEvictAsyncRequests(1 * time.Second) + if !inMemory { + time.Sleep(2 * time.Second) + } + + assert.Equal(t, 1, evictor.AsyncRequestStorage.DocCount()) + }) } } func TestAsyncQueriesEvictorStillAlive(t *testing.T) { - // TODO: add also 3rd storage and nice test for it (remove from memory, but still in elastic) - //realUrl, _ := url.Parse("http://localhost:9201") - //cfgUrl := config.Url(*realUrl) - //cfg := config.ElasticsearchConfiguration{Url: &cfgUrl} storageKinds := []AsyncRequestResultStorage{ NewAsyncRequestResultStorageInMemory(), - //NewAsyncRequestResultStorageInElasticsearch(cfg), // passes, reskip after merge - //NewAsyncSearchStorageInMemoryFallbackElastic(cfg), // passes, reskip after merge + NewAsyncRequestResultStorageInElasticsearch(testConfig()), + NewAsyncSearchStorageInMemoryFallbackElastic(testConfig()), } for _, storage := range storageKinds { - queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory) - queryContextStorage.idToContext.Store("1", &AsyncQueryContext{}) - evictor := NewAsyncQueriesEvictor(storage, queryContextStorage) - evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now()}) - evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now()}) - evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now()}) - - time.Sleep(2 * time.Second) - evictor.tryEvictAsyncRequests(10 * time.Second) - time.Sleep(2 * time.Second) - - assert.Equal(t, 3, evictor.AsyncRequestStorage.DocCount()) + t.Run(fmt.Sprintf("storage %T", storage), func(t *testing.T) { + _, inMemory := storage.(AsyncRequestResultStorageInMemory) + if !inMemory { + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") + } + + queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory) + queryContextStorage.idToContext.Store("1", &AsyncQueryContext{}) + evictor := NewAsyncQueriesEvictor(storage, queryContextStorage) + evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now()}) + evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now()}) + evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now()}) + + if !inMemory { + time.Sleep(2 * time.Second) + } + evictor.tryEvictAsyncRequests(10 * time.Second) + if !inMemory { + time.Sleep(2 * time.Second) + } + + assert.Equal(t, 3, evictor.AsyncRequestStorage.DocCount()) + }) } } func TestInMemoryFallbackElasticStorage(t *testing.T) { - //t.Skip("passes locally, but requires elasticsearch to be running, so skipping") + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") storage := NewAsyncSearchStorageInMemoryFallbackElastic(testConfig()) storage.Store("1", &AsyncRequestResult{}) storage.Store("2", &AsyncRequestResult{}) @@ -122,24 +136,26 @@ func testConfig() config.ElasticsearchConfiguration { func TestEvictingAsyncQuery_1(t *testing.T) { t.Skip("TODO: automize this test after evicting from Clickhouse from UI works") options := clickhouse.Options{Addr: []string{"localhost:9000"}} - a := clickhouse.OpenDB(&options) - ctx := clickhouse.Context(context.Background(), clickhouse.WithQueryID(qid)) + db := clickhouse.OpenDB(&options) + defer db.Close() - b, err := a.QueryContext(ctx, "SELECT number FROM (SELECT number FROM numbers(100_000_000_000)) ORDER BY number DESC LIMIT 10") - var q int64 - for b.Next() { - b.Scan(&q) - fmt.Println(q) + ctx := clickhouse.Context(context.Background(), clickhouse.WithQueryID(qid)) + rows, err := db.QueryContext(ctx, "SELECT number FROM (SELECT number FROM numbers(100_000_000_000)) ORDER BY number DESC LIMIT 10") + var i int64 + for rows.Next() { + rows.Scan(&i) + fmt.Println(i) } - fmt.Println(b, "q:", q, err) + fmt.Println(rows, "i:", i, err) } func TestEvictingAsyncQuery_2(t *testing.T) { t.Skip("TODO: automize this test after evicting from Clickhouse from UI works") options := clickhouse.Options{Addr: []string{"localhost:9000"}} - a := clickhouse.OpenDB(&options) + db := clickhouse.OpenDB(&options) + defer db.Close() - b, err := a.Query("KILL QUERY WHERE query_id= 'x'") - fmt.Println(b, err) + rows, err := db.Query("KILL QUERY WHERE query_id= 'x'") + fmt.Println(rows, err) } diff --git a/quesma/quesma/dual_write_proxy_v2.go b/quesma/quesma/dual_write_proxy_v2.go index 32594025a..6a80b01ee 100644 --- a/quesma/quesma/dual_write_proxy_v2.go +++ b/quesma/quesma/dual_write_proxy_v2.go @@ -122,7 +122,7 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic logManager: logManager, publicPort: config.PublicTcpPort, asyncQueriesEvictor: async_search_storage.NewAsyncQueriesEvictor( - queryProcessor.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory), + queryProcessor.AsyncRequestStorage.(async_search_storage.AsyncRequestResultStorageInMemory), queryProcessor.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory), ), queryRunner: queryProcessor,