Skip to content

Commit

Permalink
Fix linter
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Nov 11, 2024
1 parent 532f39e commit 76035ad
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 44 deletions.
2 changes: 1 addition & 1 deletion quesma/persistence/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ func (p *ElasticJSONDatabase) List() ([]string, error) {
}`

resp, err := p.httpClient.Request(context.Background(), "GET", elasticsearchURL, []byte(query))
defer resp.Body.Close()
if err != nil {
return nil, err
}
defer resp.Body.Close()

jsonAsBytes, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions quesma/persistence/elastic_with_eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (
"time"
)

const MAX_DOC_COUNT = 10000 // TODO: fix/make configurable/idk/etc
const defaultHugeSizeInBytesLimit = int64(500_000_000_000) // 500GB

// so far I serialize entire struct and keep only 1 string in ES
type ElasticDatabaseWithEviction struct {
ctx context.Context
Expand Down Expand Up @@ -270,10 +267,6 @@ func (db *ElasticDatabaseWithEviction) getAll() (documents []*JSONWithSize, err
return documents, nil
}

func (db *ElasticDatabaseWithEviction) evict(indexes []string) {
// todo
}

func (db *ElasticDatabaseWithEviction) fullIndexName() string {
now := time.Now().UTC()
return fmt.Sprintf("%s-%d-%d-%d-%d-%d-%d", db.indexName, now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second())
Expand Down
2 changes: 2 additions & 0 deletions quesma/persistence/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func TestNewElasticPersistence(t *testing.T) {
}

func TestJSONDatabaseWithEviction_noEviction(t *testing.T) {
t.Skip("passes locally, but requires elasticsearch to be running, so skipping")
logger.InitSimpleLoggerForTests()
indexName := fmt.Sprintf("quesma_test_%d", time.Now().UnixMilli())
fmt.Println("indexName:", indexName)
Expand Down Expand Up @@ -154,6 +155,7 @@ func TestJSONDatabaseWithEviction_noEviction(t *testing.T) {
}

func TestJSONDatabaseWithEviction_withEviction(t *testing.T) {
t.Skip("passes locally, but requires elasticsearch to be running, so skipping")
logger.InitSimpleLoggerForTests()
indexName := fmt.Sprintf("quesma_test_%d", time.Now().UnixMilli())

Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/async_search_storage/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s AsyncRequestResultStorageInMemory) DocCount() int {
func (s AsyncRequestResultStorageInMemory) SpaceInUse() int64 {
size := int64(0)
s.Range(func(key string, value *AsyncRequestResult) bool {
size += int64(len(value.GetResponseBody()))
size += int64(len(value.ResponseBody))
return true
})
return size
Expand All @@ -64,7 +64,7 @@ func (s AsyncRequestResultStorageInMemory) SpaceMaxAvailable() int64 {
func (s AsyncRequestResultStorageInMemory) evict(evictOlderThan time.Duration) {
var ids []string
s.Range(func(key string, value *AsyncRequestResult) bool {
if time.Since(value.added) > evictOlderThan {
if time.Since(value.Added) > evictOlderThan {
ids = append(ids, key)
}
return true
Expand Down
21 changes: 11 additions & 10 deletions quesma/quesma/async_search_storage/in_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ func TestAsyncQueriesEvictorTimePassed(t *testing.T) {
// TODO: add also 3rd storage and nice test for it (remove from memory, but still in elastic)
storageKinds := []AsyncRequestResultStorage{
NewAsyncRequestResultStorageInMemory(),
NewAsyncRequestResultStorageInElasticsearch(),
NewAsyncSearchStorageInMemoryFallbackElastic(),
//NewAsyncRequestResultStorageInElasticsearch(), passes
//NewAsyncSearchStorageInMemoryFallbackElastic(), passes
}
for _, storage := range storageKinds {
queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory)
queryContextStorage.idToContext.Store("1", &AsyncQueryContext{})
evictor := NewAsyncQueriesEvictor(storage, queryContextStorage)
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{added: time.Now()})
evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{added: time.Now()})
evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{added: time.Now()})
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now()})

time.Sleep(2 * time.Second)
evictor.tryEvictAsyncRequests(1 * time.Second)
Expand All @@ -39,16 +39,16 @@ func TestAsyncQueriesEvictorStillAlive(t *testing.T) {
// TODO: add also 3rd storage and nice test for it (remove from memory, but still in elastic)
storageKinds := []AsyncRequestResultStorage{
NewAsyncRequestResultStorageInMemory(),
NewAsyncRequestResultStorageInElasticsearch(),
NewAsyncSearchStorageInMemoryFallbackElastic(),
//NewAsyncRequestResultStorageInElasticsearch(), passes
//NewAsyncSearchStorageInMemoryFallbackElastic(), passes
}
for _, storage := range storageKinds {
queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory)
queryContextStorage.idToContext.Store("1", &AsyncQueryContext{})
evictor := NewAsyncQueriesEvictor(storage, queryContextStorage)
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{added: time.Now()})
evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{added: time.Now()})
evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{added: time.Now()})
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now()})
evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now()})

time.Sleep(2 * time.Second)
evictor.tryEvictAsyncRequests(10 * time.Second)
Expand All @@ -59,6 +59,7 @@ func TestAsyncQueriesEvictorStillAlive(t *testing.T) {
}

func TestInMemoryFallbackElasticStorage(t *testing.T) {
t.Skip("passes locally, but requires elasticsearch to be running, so skipping")
storage := NewAsyncSearchStorageInMemoryFallbackElastic()
storage.Store("1", &AsyncRequestResult{})
storage.Store("2", &AsyncRequestResult{})
Expand Down
28 changes: 8 additions & 20 deletions quesma/quesma/async_search_storage/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,22 @@ type AsyncQueryContextStorage interface {
}

type AsyncRequestResult struct {
responseBody []byte `json:"responseBody"`
added time.Time `json:"added"`
isCompressed bool `json:"isCompressed"`
err error `json:"err"`
ResponseBody []byte `json:"responseBody"`
Added time.Time `json:"added"`
IsCompressed bool `json:"isCompressed"`
Err error `json:"err"`
}

func NewAsyncRequestResult(responseBody []byte, err error, added time.Time, isCompressed bool) *AsyncRequestResult {
return &AsyncRequestResult{responseBody: responseBody, err: err, added: added, isCompressed: isCompressed}
}

func (r *AsyncRequestResult) GetResponseBody() []byte {
return r.responseBody
}

func (r *AsyncRequestResult) GetErr() error {
return r.err
}

func (r *AsyncRequestResult) IsCompressed() bool {
return r.isCompressed
return &AsyncRequestResult{ResponseBody: responseBody, Err: err, Added: added, IsCompressed: isCompressed}
}

func (r *AsyncRequestResult) toJSON(id string) *persistence.JSONWithSize {
json := types.JSON{}
json["id"] = id
json["data"] = string(r.responseBody)
json["sizeInBytes"] = int64(len(r.responseBody)) + int64(len(id)) + 100 // 100 is a rough upper bound estimate of the size of the rest of the fields
json["added"] = r.added
json["data"] = string(r.ResponseBody)
json["sizeInBytes"] = int64(len(r.ResponseBody)) + int64(len(id)) + 100 // 100 is a rough upper bound estimate of the size of the rest of the fields
json["added"] = r.Added
return persistence.NewJSONWithSize(json, id, json["sizeInBytes"].(int64))
}

Expand Down
8 changes: 4 additions & 4 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,15 +496,15 @@ func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) (
return queryparser.EmptyAsyncSearchResponse(id, false, 503)
}
if result, err := q.AsyncRequestStorage.Load(id); err != nil {
if err := result.GetErr(); err != nil {
if result.Err != nil {
q.AsyncRequestStorage.Delete(id)
logger.ErrorWithCtx(ctx).Msgf("error processing async query: %v", err)
return queryparser.EmptyAsyncSearchResponse(id, false, 503)
}
q.AsyncRequestStorage.Delete(id)
// We use zstd to conserve memory, as we have a lot of async queries
if result.IsCompressed() {
buf, err := util.Decompress(result.GetResponseBody())
if result.IsCompressed {
buf, err := util.Decompress(result.ResponseBody)
if err == nil {
// Mark trace end is called only when the async query is fully processed
// which means that isPartial is false
Expand All @@ -517,7 +517,7 @@ func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) (
// Mark trace end is called only when the async query is fully processed
// which means that isPartial is false
logger.MarkTraceEndWithCtx(ctx).Msgf("Async query id : %s ended successfully", id)
return result.GetResponseBody(), nil
return result.ResponseBody, nil
} else {
const isPartial = true
logger.InfoWithCtx(ctx).Msgf("async query id : %s partial result", id)
Expand Down

0 comments on commit 76035ad

Please sign in to comment.