Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Dec 20, 2024
1 parent b4ef499 commit b25ffe0
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 78 deletions.
4 changes: 2 additions & 2 deletions quesma/quesma/async_search_storage/evictor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (e *AsyncQueriesEvictor) AsyncQueriesGC() {
case <-e.ctx.Done():
logger.Debug().Msg("evictor stopped")
return
case <-time.After(GCInterval):
e.tryEvictAsyncRequests(EvictionInterval)
case <-time.After(gcInterval):
e.tryEvictAsyncRequests(evictionInterval)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions quesma/quesma/async_search_storage/in_elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ package async_search_storage
import (
"encoding/json"
"fmt"
"math/rand"
"quesma/logger"
"quesma/persistence"
"quesma/quesma/config"
"strconv"
"time"
)

Expand All @@ -29,12 +27,15 @@ func NewAsyncRequestResultStorageInElasticsearch(cfg config.ElasticsearchConfigu
User: "",
Password: "",
}
*/
i := rand.Int()
fmt.Println("kk dbg NewAsyncRequestResultStorageInElasticsearch() i:", cfg)
return AsyncRequestResultStorageInElasticsearch{
db: persistence.NewElasticDatabaseWithEviction(cfg, "quesma_async_storage-"+strconv.Itoa(i), 1_000_000_000),
}
*/
return AsyncRequestResultStorageInElasticsearch{
db: persistence.NewElasticDatabaseWithEviction(cfg, defaultElasticDbName, defaultElasticDbStorageLimitInBytes),
}
}

func (s AsyncRequestResultStorageInElasticsearch) Store(id string, result *AsyncRequestResult) {
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/async_search_storage/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (e *AsyncQueryTraceLoggerEvictor) Stop() {
func (e *AsyncQueryTraceLoggerEvictor) TryFlushHangingAsyncQueryTrace(timeFun func(time.Time) time.Duration) {
asyncIds := []string{}
e.AsyncQueryTrace.Range(func(key string, value tracing.TraceCtx) bool {
if timeFun(value.Added) > EvictionInterval {
if timeFun(value.Added) > evictionInterval {
asyncIds = append(asyncIds, key)
logger.Error().Msgf("Async query %s was not finished", key)
var formattedLines strings.Builder
Expand All @@ -154,7 +154,7 @@ func (e *AsyncQueryTraceLoggerEvictor) FlushHangingAsyncQueryTrace(timeFun func(
defer recovery.LogPanic()
for {
select {
case <-time.After(GCInterval):
case <-time.After(gcInterval):
e.TryFlushHangingAsyncQueryTrace(timeFun)
case <-e.ctx.Done():
logger.Debug().Msg("AsyncQueryTraceLoggerEvictor stopped")
Expand Down
39 changes: 0 additions & 39 deletions quesma/quesma/async_search_storage/in_memory_test.go

This file was deleted.

55 changes: 30 additions & 25 deletions quesma/quesma/async_search_storage/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,46 @@ import (
"time"
)

const EvictionInterval = 15 * time.Minute
const GCInterval = 1 * time.Minute

type AsyncRequestResultStorage interface {
Store(id string, result *AsyncRequestResult)
Load(id string) (*AsyncRequestResult, error)
Delete(id string)
DocCount() int
SpaceInUse() int64
SpaceMaxAvailable() int64
const (
evictionInterval = 15 * time.Minute
gcInterval = 1 * time.Minute
defaultElasticDbName = "async_search_storage"
defaultElasticDbStorageLimitInBytes = int64(100 * 1024 * 1024 * 1024) // 100GB
)

evict(olderThan time.Duration)
}
type (
AsyncRequestResultStorage interface {
Store(id string, result *AsyncRequestResult)
Load(id string) (*AsyncRequestResult, error)
Delete(id string)
DocCount() int
SpaceInUse() int64
SpaceMaxAvailable() int64

type AsyncQueryContextStorage interface {
Store(context *AsyncQueryContext)
evict(olderThan time.Duration)
}

type AsyncRequestResult struct {
ResponseBody []byte `json:"responseBody"`
Added time.Time `json:"added"`
IsCompressed bool `json:"isCompressed"`
Err error `json:"err"`
}
evict(olderThan time.Duration)
}
AsyncQueryContextStorage interface {
Store(context *AsyncQueryContext)
evict(olderThan time.Duration)
}
AsyncRequestResult struct {
ResponseBody []byte `json:"responseBody"`
Added time.Time `json:"added"`
IsCompressed bool `json:"isCompressed"`
Err error `json:"err"`
}
)

func NewAsyncRequestResult(responseBody []byte, err error, added time.Time, isCompressed bool) *AsyncRequestResult {
return &AsyncRequestResult{ResponseBody: responseBody, Err: err, Added: added, IsCompressed: isCompressed}
}

func (r *AsyncRequestResult) toJSON(id string) *persistence.JSONWithSize {
const sizeInBytesUpperBoundEstimate = int64(100) // 100 is a rough upper bound estimate of the size of the rest of the fields
json := types.JSON{}
json["id"] = id
json["data"] = string(r.ResponseBody)
json["sizeInBytes"] = int64(len(r.ResponseBody)) + int64(len(id)) + 100 // 100 is a rough upper bound estimate of the size of the rest of the fields
json["sizeInBytes"] = int64(len(r.ResponseBody)) + int64(len(id)) + sizeInBytesUpperBoundEstimate
json["added"] = r.Added
return persistence.NewJSONWithSize(json, id, json["sizeInBytes"].(int64))
}
Expand All @@ -60,7 +65,7 @@ func NewAsyncQueryContext(ctx context.Context, cancel context.CancelFunc, id str
}

func (c *AsyncQueryContext) toJSON() *persistence.JSONWithSize {
sizeInBytesUpperBoundEstimate := int64(100)
const sizeInBytesUpperBoundEstimate = int64(100)
json := types.JSON{}
json["id"] = c.id
json["added"] = c.added
Expand Down
12 changes: 5 additions & 7 deletions quesma/quesma/async_search_storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ func testConfig() config.ElasticsearchConfiguration {
}
}

func TestKK(t *testing.T) {
// TODO: remove this test after evicting from Clickhouse from UI works
t.Skip()
func TestEvictingAsyncQuery_1(t *testing.T) {
t.Skip("TODO: automize this test after evicting from Clickhouse from UI works")
options := clickhouse.Options{Addr: []string{"localhost:9000"}}
a := clickhouse.OpenDB(&options)
ctx := clickhouse.Context(context.Background(), clickhouse.WithQueryID(qid))
Expand All @@ -136,12 +135,11 @@ func TestKK(t *testing.T) {
fmt.Println(b, "q:", q, err)
}

func TestCancel(t *testing.T) {
// TODO: remove this test after evicting from Clickhouse from UI works
t.Skip()
func TestEvictingAsyncQuery_2(t *testing.T) {
t.Skip("TODO: automize this test after evicting from Clickhouse from UI works")
options := clickhouse.Options{Addr: []string{"localhost:9000"}}
a := clickhouse.OpenDB(&options)

b, err := a.Query("KILL QUERY WHERE query_id= 'dupa'")
b, err := a.Query("KILL QUERY WHERE query_id= 'x'")
fmt.Println(b, err)
}

0 comments on commit b25ffe0

Please sign in to comment.