Skip to content

Commit

Permalink
Fix all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Dec 20, 2024
1 parent 51b1e68 commit 89a5180
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 57 deletions.
5 changes: 2 additions & 3 deletions quesma/persistence/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestNewElasticPersistence(t *testing.T) {
}

func TestJSONDatabaseWithEviction_noEviction(t *testing.T) {
t.Skip("passes locally, but requires elasticsearch to be running, so skipping")
t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now")
logger.InitSimpleLoggerForTests()
indexName := fmt.Sprintf("quesma_test_%d", time.Now().UnixMilli())
fmt.Println("indexName:", indexName)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestJSONDatabaseWithEviction_noEviction(t *testing.T) {
}

func TestJSONDatabaseWithEviction_withEviction(t *testing.T) {
t.Skip("passes locally, but requires elasticsearch to be running, so skipping")
t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now")
logger.InitSimpleLoggerForTests()
indexName := fmt.Sprintf("quesma_test_%d", time.Now().UnixMilli())

Expand Down Expand Up @@ -213,7 +213,6 @@ func TestJSONDatabaseWithEviction_withEviction(t *testing.T) {
assert.Equal(t, 2, docCount)

err = db.Put(docs[4])
fmt.Println("put", docs[4].SizeInBytesTotal, err)
assert.NoError(t, err)

time.Sleep(elasticUpdateTime)
Expand Down
9 changes: 4 additions & 5 deletions quesma/quesma/async_search_storage/in_elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ func NewAsyncRequestResultStorageInElasticsearch(cfg config.ElasticsearchConfigu
User: "",
Password: "",
}
i := rand.Int()
fmt.Println("kk dbg NewAsyncRequestResultStorageInElasticsearch() i:", cfg)
return AsyncRequestResultStorageInElasticsearch{
db: persistence.NewElasticDatabaseWithEviction(cfg, "quesma_async_storage-"+strconv.Itoa(i), 1_000_000_000),
}
fmt.Println("kk dbg NewAsyncRequestResultStorageInElasticsearch() i:", cfg)
return AsyncRequestResultStorageInElasticsearch{
db: persistence.NewElasticDatabaseWithEviction(cfg, "quesma_async_storage-"+strconv.Itoa(i), 1_000_000_000),
}
*/
return AsyncRequestResultStorageInElasticsearch{
db: persistence.NewElasticDatabaseWithEviction(cfg, defaultElasticDbName, defaultElasticDbStorageLimitInBytes),
Expand Down
112 changes: 64 additions & 48 deletions quesma/quesma/async_search_storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,73 @@ import (
)

func TestAsyncQueriesEvictorTimePassed(t *testing.T) {
// TODO: add also 3rd storage and nice test for it (remove from memory, but still in elastic)
//realUrl, _ := url.Parse("http://localhost:9201")
//cfgUrl := config.Url(*realUrl)
//cfg := config.ElasticsearchConfiguration{Url: &cfgUrl}
storageKinds := []AsyncRequestResultStorage{
NewAsyncRequestResultStorageInMemory(),
//NewAsyncRequestResultStorageInElasticsearch(cfg), // passes, reskip after merge
//NewAsyncSearchStorageInMemoryFallbackElastic(cfg), // passes, reskip after merge
NewAsyncRequestResultStorageInElasticsearch(testConfig()),
NewAsyncSearchStorageInMemoryFallbackElastic(testConfig()),
}
for _, storage := range storageKinds {
queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory)
queryContextStorage.idToContext.Store("1", &AsyncQueryContext{})
evictor := NewAsyncQueriesEvictor(storage, queryContextStorage)
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now()})

time.Sleep(2 * time.Second)
evictor.tryEvictAsyncRequests(1 * time.Second)
time.Sleep(2 * time.Second)

assert.Equal(t, 0, evictor.AsyncRequestStorage.DocCount())
t.Run(fmt.Sprintf("storage %T", storage), func(t *testing.T) {
_, inMemory := storage.(AsyncRequestResultStorageInMemory)
if !inMemory {
t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now")
}

queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory)
queryContextStorage.idToContext.Store("1", &AsyncQueryContext{})
evictor := NewAsyncQueriesEvictor(storage, queryContextStorage)
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now().Add(-2 * time.Second)})
evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now().Add(-5 * time.Second)})
evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now().Add(2 * time.Second)})

if !inMemory {
time.Sleep(2 * time.Second)
}
evictor.tryEvictAsyncRequests(1 * time.Second)
if !inMemory {
time.Sleep(2 * time.Second)
}

assert.Equal(t, 1, evictor.AsyncRequestStorage.DocCount())
})
}
}

func TestAsyncQueriesEvictorStillAlive(t *testing.T) {
// TODO: add also 3rd storage and nice test for it (remove from memory, but still in elastic)
//realUrl, _ := url.Parse("http://localhost:9201")
//cfgUrl := config.Url(*realUrl)
//cfg := config.ElasticsearchConfiguration{Url: &cfgUrl}
storageKinds := []AsyncRequestResultStorage{
NewAsyncRequestResultStorageInMemory(),
//NewAsyncRequestResultStorageInElasticsearch(cfg), // passes, reskip after merge
//NewAsyncSearchStorageInMemoryFallbackElastic(cfg), // passes, reskip after merge
NewAsyncRequestResultStorageInElasticsearch(testConfig()),
NewAsyncSearchStorageInMemoryFallbackElastic(testConfig()),
}
for _, storage := range storageKinds {
queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory)
queryContextStorage.idToContext.Store("1", &AsyncQueryContext{})
evictor := NewAsyncQueriesEvictor(storage, queryContextStorage)
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now()})

time.Sleep(2 * time.Second)
evictor.tryEvictAsyncRequests(10 * time.Second)
time.Sleep(2 * time.Second)

assert.Equal(t, 3, evictor.AsyncRequestStorage.DocCount())
t.Run(fmt.Sprintf("storage %T", storage), func(t *testing.T) {
_, inMemory := storage.(AsyncRequestResultStorageInMemory)
if !inMemory {
t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now")
}

queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory)
queryContextStorage.idToContext.Store("1", &AsyncQueryContext{})
evictor := NewAsyncQueriesEvictor(storage, queryContextStorage)
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now()})

if !inMemory {
time.Sleep(2 * time.Second)
}
evictor.tryEvictAsyncRequests(10 * time.Second)
if !inMemory {
time.Sleep(2 * time.Second)
}

assert.Equal(t, 3, evictor.AsyncRequestStorage.DocCount())
})
}
}

func TestInMemoryFallbackElasticStorage(t *testing.T) {
//t.Skip("passes locally, but requires elasticsearch to be running, so skipping")
t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now")
storage := NewAsyncSearchStorageInMemoryFallbackElastic(testConfig())
storage.Store("1", &AsyncRequestResult{})
storage.Store("2", &AsyncRequestResult{})
Expand Down Expand Up @@ -122,24 +136,26 @@ func testConfig() config.ElasticsearchConfiguration {
func TestEvictingAsyncQuery_1(t *testing.T) {
t.Skip("TODO: automize this test after evicting from Clickhouse from UI works")
options := clickhouse.Options{Addr: []string{"localhost:9000"}}
a := clickhouse.OpenDB(&options)
ctx := clickhouse.Context(context.Background(), clickhouse.WithQueryID(qid))
db := clickhouse.OpenDB(&options)
defer db.Close()

b, err := a.QueryContext(ctx, "SELECT number FROM (SELECT number FROM numbers(100_000_000_000)) ORDER BY number DESC LIMIT 10")
var q int64
for b.Next() {
b.Scan(&q)
fmt.Println(q)
ctx := clickhouse.Context(context.Background(), clickhouse.WithQueryID(qid))
rows, err := db.QueryContext(ctx, "SELECT number FROM (SELECT number FROM numbers(100_000_000_000)) ORDER BY number DESC LIMIT 10")
var i int64
for rows.Next() {
rows.Scan(&i)
fmt.Println(i)
}

fmt.Println(b, "q:", q, err)
fmt.Println(rows, "i:", i, err)
}

func TestEvictingAsyncQuery_2(t *testing.T) {
t.Skip("TODO: automize this test after evicting from Clickhouse from UI works")
options := clickhouse.Options{Addr: []string{"localhost:9000"}}
a := clickhouse.OpenDB(&options)
db := clickhouse.OpenDB(&options)
defer db.Close()

b, err := a.Query("KILL QUERY WHERE query_id= 'x'")
fmt.Println(b, err)
rows, err := db.Query("KILL QUERY WHERE query_id= 'x'")
fmt.Println(rows, err)
}
2 changes: 1 addition & 1 deletion quesma/quesma/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic
logManager: logManager,
publicPort: config.PublicTcpPort,
asyncQueriesEvictor: async_search_storage.NewAsyncQueriesEvictor(
queryProcessor.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory),
queryProcessor.AsyncRequestStorage.(async_search_storage.AsyncRequestResultStorageInMemory),
queryProcessor.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory),
),
queryRunner: queryProcessor,
Expand Down

0 comments on commit 89a5180

Please sign in to comment.