Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent storage for asyncs 3 #895

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions quesma/persistence/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,10 @@ func (p *ElasticJSONDatabase) List() ([]string, error) {
}`

resp, err := p.httpClient.Request(context.Background(), "GET", elasticsearchURL, []byte(query))

defer resp.Body.Close()
trzysiek marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
defer resp.Body.Close()

jsonAsBytes, err := io.ReadAll(resp.Body)
if err != nil {
Expand All @@ -142,7 +141,7 @@ func (p *ElasticJSONDatabase) List() ([]string, error) {
var ids []string
// Unmarshal the JSON response
var result map[string]interface{}
if err := json.Unmarshal(jsonAsBytes, &result); err != nil {
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
log.Fatalf("Error parsing the response JSON: %s", err)
}

Expand Down
150 changes: 150 additions & 0 deletions quesma/persistence/elastic_with_eviction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package persistence

import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"fmt"
"io"
"net/http"
"quesma/logger"
"quesma/quesma/config"
)

const MAX_DOC_COUNT = 10000 // prototype TODO: fix/make configurable/idk/etc

// so far I serialize entire struct and keep only 1 string in ES
type ElasticDatabaseWithEviction struct {
ctx context.Context
*ElasticJSONDatabase // maybe remove and copy fields here
EvictorInterface
sizeInBytesLimit int64
}

func NewElasticDatabaseWithEviction(ctx context.Context, cfg config.ElasticsearchConfiguration, indexName string, sizeInBytesLimit int64) *ElasticDatabaseWithEviction {
return &ElasticDatabaseWithEviction{
ElasticJSONDatabase: NewElasticJSONDatabase(cfg, indexName),
EvictorInterface: &Evictor{},
sizeInBytesLimit: sizeInBytesLimit,
}
}

// mutexy? or what
func (db *ElasticDatabaseWithEviction) Put(id string, row Sizeable) bool {
bytesNeeded := db.SizeInBytes() + row.SizeInBytes()
if bytesNeeded > db.SizeInBytesLimit() {
logger.InfoWithCtx(db.ctx).Msg("Database is full, evicting documents")
//docsToEvict, bytesEvicted := db.SelectToEvict(db.getAll(), bytesNeeded-db.SizeInBytesLimit())
//db.evict(docsToEvict)
//bytesNeeded -= bytesEvicted
}
if bytesNeeded > db.SizeInBytesLimit() {
// put document
return false
}

serialized, err := db.serialize(row)
if err != nil {
logger.WarnWithCtx(db.ctx).Msg("Error serializing document, id:" + id)
return false
}

err = db.ElasticJSONDatabase.Put(id, serialized)
if err != nil {
logger.WarnWithCtx(db.ctx).Msgf("Error putting document, id: %s, error: %v", id, err)
return false
}

return true
}

// co zwraca? zrobić switch na oba typy jakie teraz mamy?
func (db *ElasticDatabaseWithEviction) Get(id string) (string, bool) { // probably change return type to *Sizeable
value, success, err := db.ElasticJSONDatabase.Get(id)
if err != nil {
logger.WarnWithCtx(db.ctx).Msgf("Error getting document, id: %s, error: %v", id, err)
return "", false
}
return value, success
}

func (db *ElasticDatabaseWithEviction) Delete(id string) {
// mark as deleted, don't actually delete
// (single document deletion is hard in ES, it's done by evictor for entire index)
}

func (db *ElasticDatabaseWithEviction) DocCount() (count int, success bool) {
// TODO: add WHERE not_deleted

// Build the query to get only document IDs
elasticsearchURL := fmt.Sprintf("%s/_search", db.indexName)
query := `{
"_source": false,
"size": 0,
"track_total_hits": true
}`

resp, err := db.httpClient.Request(context.Background(), "GET", elasticsearchURL, []byte(query))
defer resp.Body.Close()
if err != nil {
return
}

jsonAsBytes, err := io.ReadAll(resp.Body)
if err != nil {
return
}

switch resp.StatusCode {
case http.StatusOK:
break
default:
logger.WarnWithCtx(db.ctx).Msgf("failed to get from elastic: %s, response status code: %v", string(jsonAsBytes), resp.StatusCode)
return
}

// Unmarshal the JSON response
var result map[string]interface{}
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
logger.WarnWithCtx(db.ctx).Msgf("Error parsing the response JSON: %s", err)
return
}

count = int(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) // TODO: add some checks... to prevent panic
return count, true
}

func (db *ElasticDatabaseWithEviction) SizeInBytes() (sizeInBytes int64, success bool) {
elasticsearchURL := fmt.Sprintf("%s/_search", db.indexName)

// Build the query to get only document IDs
query := fmt.Sprintf(`{"_source": false, "size": %d}`, MAX_DOC_COUNT)
}

func (db *ElasticDatabaseWithEviction) SizeInBytesLimit() int64 {
return db.sizeInBytesLimit
}

func (db *ElasticDatabaseWithEviction) getAll() *basicDocumentInfo {
// send query
return nil
}

func (db *ElasticDatabaseWithEviction) evict(documents []*basicDocumentInfo) {

}

func (db *ElasticDatabaseWithEviction) serialize(row Sizeable) (serialized string, err error) {
var b bytes.Buffer

enc := gob.NewEncoder(&b) // maybe create 1 encoder forever
if err = enc.Encode(row); err != nil {
fmt.Println("Error encoding struct:", err)
return
}

return b.String(), nil
}
18 changes: 18 additions & 0 deletions quesma/persistence/evictor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package persistence

type EvictorInterface interface {
SelectToEvict(documents []*basicDocumentInfo, sizeNeeded int64) (evictThese []*basicDocumentInfo, bytesEvicted int64)
}

// It's only 1 implementation, which looks well suited for ElasticSearch.
// It can be implemented differently.
type Evictor struct{}

func (e *Evictor) SelectToEvict(documents []*basicDocumentInfo, sizeNeeded int64) (evictThese []*basicDocumentInfo, bytesEvicted int64) {
if sizeNeeded <= 0 {
return // check if it's empty array or nil
}

}
23 changes: 23 additions & 0 deletions quesma/persistence/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Elastic-2.0
package persistence

import "time"

// JSONDatabase is an interface for a database that stores JSON data.
// Treat it as `etcd` equivalent rather than `MongoDB`.
// The main usage is to store our configuration data, like
Expand All @@ -15,3 +17,24 @@ type JSONDatabase interface {
Get(key string) (string, bool, error)
Put(key string, data string) error
}

// T - type of the data to store, e.g. async_search_storage.AsyncRequestResult
type JSONDatabaseWithEviction interface { // for sure JSON? maybe not only json? check
Put(row *Sizeable) error
Get(id string) (*Sizeable, bool)
Delete(id string)
DocCount() int
SizeInBytes() int64
SizeInBytesLimit() int64
}

type basicDocumentInfo struct {
id string
sizeInBytes int64
timestamp time.Time
markedAsDeleted bool
}

type Sizeable interface {
SizeInBytes() int64
}
11 changes: 10 additions & 1 deletion quesma/quesma/async_search_storage/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,19 @@ func (s AsyncSearchStorageInMemory) Delete(id string) {
s.idToResult.Delete(id)
}

func (s AsyncSearchStorageInMemory) Size() int {
func (s AsyncSearchStorageInMemory) DocCount() int {
return s.idToResult.Size()
}

func (s AsyncSearchStorageInMemory) SizeInBytes() int {
size := 0
s.Range(func(key string, value *AsyncRequestResult) bool {
size += len(value.GetResponseBody())
return true
})
return size
}

type AsyncQueryContextStorageInMemory struct {
idToContext *concurrent.Map[string, *AsyncQueryContext]
}
Expand Down
5 changes: 3 additions & 2 deletions quesma/quesma/async_search_storage/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (

type AsyncRequestResultStorage interface {
Store(id string, result *AsyncRequestResult)
Range(func(key string, value *AsyncRequestResult) bool) // ideally I'd like to get rid of this, but not sure if it's possible
Load(id string) (*AsyncRequestResult, bool)
Delete(id string)
Size() int
DocCount() int
SizeInBytes() uint64
SizeInBytesLimit() uint64
}

// TODO: maybe merge those 2?
Expand Down
11 changes: 1 addition & 10 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,15 +490,6 @@ func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyn
return
}

func (q *QueryRunner) asyncQueriesCumulatedBodySize() int {
size := 0
q.AsyncRequestStorage.Range(func(key string, value *async_search_storage.AsyncRequestResult) bool {
size += len(value.GetResponseBody())
return true
})
return size
}

func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) ([]byte, error) {
if !strings.Contains(id, tracing.AsyncIdPrefix) {
logger.ErrorWithCtx(ctx).Msgf("non quesma async id: %v", id)
Expand Down Expand Up @@ -543,7 +534,7 @@ func (q *QueryRunner) deleteAsyncSearch(id string) ([]byte, error) {
}

func (q *QueryRunner) reachedQueriesLimit(ctx context.Context, asyncId string, doneCh chan<- asyncSearchWithError) bool {
if q.AsyncRequestStorage.Size() < asyncQueriesLimit && q.asyncQueriesCumulatedBodySize() < asyncQueriesLimitBytes {
if q.AsyncRequestStorage.DocCount() < asyncQueriesLimit && q.AsyncRequestStorage.SizeInBytes() < asyncQueriesLimitBytes {
return false
}
err := errors.New("too many async queries")
Expand Down
Loading