diff --git a/api/routes.go b/api/routes.go index 585a823f..94db3aed 100644 --- a/api/routes.go +++ b/api/routes.go @@ -208,7 +208,14 @@ func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Prov if err != nil { panic(err) } - cache := query.NewQueryCache(store) + lstore, err := sturdycache.AddLocalCache(store) + if err != nil { + panic(err) + } + cache, err := query.NewQueryCacheWithInvalidator(lstore) + if err != nil { + panic(err) + } logger.Log().Infof("cache configured: master=%s, replicas=%s", config.GetSturdyCacheMaster(), config.GetSturdyCacheReplicas()) defaultHeaders := []string{ diff --git a/app/query/cache.go b/app/query/cache.go index 1606c9ec..4f4d5f7f 100644 --- a/app/query/cache.go +++ b/app/query/cache.go @@ -9,6 +9,7 @@ import ( "time" "github.com/OdyseeTeam/odysee-api/internal/monitor" + "github.com/OdyseeTeam/odysee-api/pkg/chainquery" "github.com/OdyseeTeam/odysee-api/pkg/rpcerrors" "github.com/eko/gocache/lib/v4/cache" @@ -18,6 +19,11 @@ import ( "golang.org/x/sync/singleflight" ) +const ( + methodTagSeparator = ":" + invalidationInterval = 15 * time.Second +) + type CacheRequest struct { Method string Params any @@ -31,16 +37,46 @@ type CachedResponse struct { type QueryCache struct { cache *marshaler.Marshaler singleflight *singleflight.Group + height int + stopChan chan struct{} } -func NewQueryCache(store cache.CacheInterface[any]) *QueryCache { - marshal := marshaler.New(store) +func NewQueryCache(baseCache cache.CacheInterface[any]) *QueryCache { + marshal := marshaler.New(baseCache) return &QueryCache{ cache: marshal, singleflight: &singleflight.Group{}, + stopChan: make(chan struct{}), } } +func NewQueryCacheWithInvalidator(baseCache cache.CacheInterface[any]) (*QueryCache, error) { + qc := NewQueryCache(baseCache) + height, err := chainquery.GetHeight() + if err != nil { + QueryCacheErrorCount.WithLabelValues(CacheAreaChainquery).Inc() + return nil, fmt.Errorf("failed to get current height: %w", err) + } + qc.height = height + + go func() { + ticker := time.NewTicker(invalidationInterval) + for { + select { + case <-ticker.C: + err := qc.runInvalidator() + if err != nil { + logger.Log().Warn(err.Error()) + } + case <-qc.stopChan: + return + } + } + }() + + return qc, nil +} + func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*CachedResponse, error) { log := logger.Log() cacheReq := CacheRequest{ @@ -72,10 +108,10 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached start := time.Now() obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter) if err != nil { - ObserveQueryRetrievalDuration(CacheResultError, cacheReq.Method, start) + ObserveQueryCacheRetrievalDuration(CacheResultError, cacheReq.Method, start) return nil, fmt.Errorf("error calling getter: %w", err) } - ObserveQueryRetrievalDuration(CacheResultSuccess, cacheReq.Method, start) + ObserveQueryCacheRetrievalDuration(CacheResultSuccess, cacheReq.Method, start) res, ok := obj.(*jsonrpc.RPCResponse) if !ok { @@ -94,7 +130,7 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached ) if err != nil { ObserveQueryCacheOperation(CacheOperationSet, CacheResultError, cacheReq.Method, start) - monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{"method": cacheReq.Method}) + monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{ParamMethod: cacheReq.Method}) log.Warnf("error during cache.set: %s", err) return cacheResp, nil } @@ -103,6 +139,10 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached return cacheResp, nil } + if hit == nil { + ObserveQueryCacheOperation(CacheOperationGet, CacheResultError, cacheReq.Method, start) + return nil, nil + } log.Infof("cache hit for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds()) ObserveQueryCacheOperation(CacheOperationGet, CacheResultHit, cacheReq.Method, start) cacheResp, ok := hit.(*CachedResponse) @@ -112,6 +152,30 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached return cacheResp, nil } +func (c *QueryCache) runInvalidator() error { + height, err := chainquery.GetHeight() + if err != nil { + QueryCacheErrorCount.WithLabelValues(CacheAreaChainquery).Inc() + return fmt.Errorf("failed to get current height: %w", err) + } + if c.height >= height { + return nil + } + c.height = height + + ctx, cancel := context.WithTimeout(context.Background(), invalidationInterval) + defer cancel() + err = c.cache.Invalidate(ctx, store.WithInvalidateTags( + []string{fmt.Sprintf("%s%s%s", ParamMethod, methodTagSeparator, MethodClaimSearch)}, + )) + if err != nil { + QueryCacheErrorCount.WithLabelValues(CacheAreaInvalidateCall).Inc() + return fmt.Errorf("failed to invalidate %s entries: %w", MethodClaimSearch, err) + } + + return nil +} + func (r CacheRequest) Expiration() time.Duration { switch r.Method { case MethodResolve: @@ -124,7 +188,7 @@ func (r CacheRequest) Expiration() time.Duration { } func (r CacheRequest) Tags() []string { - return []string{"method:" + r.Method} + return []string{fmt.Sprintf("%s%s%s", ParamMethod, methodTagSeparator, r.Method)} } func (r CacheRequest) GetCacheKey() string { @@ -135,7 +199,7 @@ func (r CacheRequest) GetCacheKey() string { params = "()" } else { if p, err := json.Marshal(r.Params); err != nil { - params = "(x)" + params = "(error)" } else { params = string(p) } diff --git a/app/query/cache_test.go b/app/query/cache_test.go index d1621692..3ac7efeb 100644 --- a/app/query/cache_test.go +++ b/app/query/cache_test.go @@ -12,7 +12,6 @@ import ( func TestGetCacheKey(t *testing.T) { assert := assert.New(t) require := require.New(t) - assert.Equal(1, 1) seen := map[string]bool{} params := []map[string]any{{}, {"uri": "what"}, {"uri": "odysee"}, nil} genCacheKey := func(params map[string]any) string { @@ -35,3 +34,12 @@ func TestGetCacheKey(t *testing.T) { } assert.Contains(seen, genCacheKey(params[1])) } + +func TestNilResponse(t *testing.T) { + // assert := assert.New(t) + // require := require.New(t) + cr := func() *CachedResponse { + return nil + }() + cr.RPCResponse(11111) +} diff --git a/app/query/metrics.go b/app/query/metrics.go index 659c8292..2eaea2af 100644 --- a/app/query/metrics.go +++ b/app/query/metrics.go @@ -15,11 +15,14 @@ const ( CacheResultMiss = "miss" CacheResultSuccess = "success" CacheResultError = "error" + + CacheAreaChainquery = "chainquery" + CacheAreaInvalidateCall = "invalidate_call" ) var ( - queryRetrievalDurationBuckets = []float64{0.025, 0.05, 0.1, 0.25, 0.4, 1, 2.5, 5, 10, 25, 50, 100, 300} - cacheDurationBuckets = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0} + QueryCacheRetrievalDurationBuckets = []float64{0.025, 0.05, 0.1, 0.25, 0.4, 1, 2.5, 5, 10, 25, 50, 100, 300} + cacheDurationBuckets = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0} QueryCacheOperationDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ @@ -30,21 +33,29 @@ var ( }, []string{"operation", "result", "method"}, ) - QueryRetrievalDuration = promauto.NewHistogramVec( + QueryCacheRetrievalDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "query_cache", Name: "retrieval_duration_seconds", Help: "Latency for cold cache retrieval", - Buckets: queryRetrievalDurationBuckets, + Buckets: QueryCacheRetrievalDurationBuckets, }, []string{"result", "method"}, ) + QueryCacheErrorCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "query_cache", + Name: "error_count", + Help: "Errors unrelated to cache setting/retrieval", + }, + []string{"area"}, + ) ) func ObserveQueryCacheOperation(operation, result, method string, start time.Time) { QueryCacheOperationDuration.WithLabelValues(operation, result, method).Observe(float64(time.Since(start).Seconds())) } -func ObserveQueryRetrievalDuration(result, method string, start time.Time) { - QueryRetrievalDuration.WithLabelValues(result, method).Observe(float64(time.Since(start).Seconds())) +func ObserveQueryCacheRetrievalDuration(result, method string, start time.Time) { + QueryCacheRetrievalDuration.WithLabelValues(result, method).Observe(float64(time.Since(start).Seconds())) } diff --git a/go.mod b/go.mod index 52690c0a..86d68cb3 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,8 @@ require ( github.com/dgraph-io/ristretto v0.2.0 github.com/eko/gocache/lib/v4 v4.1.6 github.com/eko/gocache/store/redis/v4 v4.2.2 + github.com/eko/gocache/store/ristretto/v4 v4.2.2 + github.com/friendsofgo/errors v0.9.2 github.com/getsentry/sentry-go v0.13.0 github.com/go-chi/chi/v5 v5.0.8 github.com/go-chi/cors v1.2.1 @@ -61,6 +63,7 @@ require ( go.uber.org/zap v1.21.0 goa.design/goa/v3 v3.5.2 goa.design/plugins/v3 v3.4.3 + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f golang.org/x/oauth2 v0.6.0 golang.org/x/sync v0.9.0 gopkg.in/vansante/go-ffprobe.v2 v2.1.0 @@ -79,7 +82,7 @@ require ( github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect - golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect + golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect ) require ( diff --git a/go.sum b/go.sum index f0428ee8..865added 100644 --- a/go.sum +++ b/go.sum @@ -840,6 +840,8 @@ github.com/eko/gocache/lib/v4 v4.1.6 h1:5WWIGISKhE7mfkyF+SJyWwqa4Dp2mkdX8QsZpnEN github.com/eko/gocache/lib/v4 v4.1.6/go.mod h1:HFxC8IiG2WeRotg09xEnPD72sCheJiTSr4Li5Ameg7g= github.com/eko/gocache/store/redis/v4 v4.2.2 h1:Thw31fzGuH3WzJywsdbMivOmP550D6JS7GDHhvCJPA0= github.com/eko/gocache/store/redis/v4 v4.2.2/go.mod h1:LaTxLKx9TG/YUEybQvPMij++D7PBTIJ4+pzvk0ykz0w= +github.com/eko/gocache/store/ristretto/v4 v4.2.2 h1:lXFzoZ5ck6Gy6ON7f5DHSkNt122qN7KoroCVgVwF7oo= +github.com/eko/gocache/store/ristretto/v4 v4.2.2/go.mod h1:uIvBVJzqRepr5L0RsbkfQ2iYfbyos2fuji/s4yM+aUM= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -872,6 +874,8 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= +github.com/friendsofgo/errors v0.9.2 h1:X6NYxef4efCBdwI7BgS820zFaN7Cphrmb+Pljdzjtgk= +github.com/friendsofgo/errors v0.9.2/go.mod h1:yCvFW5AkDIL9qn7suHVLiI/gH228n7PC4Pn44IGoTOI= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= @@ -2504,6 +2508,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= diff --git a/pkg/chainquery/chainquery.go b/pkg/chainquery/chainquery.go new file mode 100644 index 00000000..d1aea1ac --- /dev/null +++ b/pkg/chainquery/chainquery.go @@ -0,0 +1,83 @@ +package chainquery + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/friendsofgo/errors" +) + +const ( + apiUrl = "https://chainquery.odysee.tv/api/sql" + queryTimeout = 15 * time.Second +) + +type HttpDoer interface { + Do(*http.Request) (*http.Response, error) +} + +type HeightResponse struct { + Success bool `json:"success"` + Error *string `json:"error"` + Data []HeightData `json:"data"` +} + +type HeightData struct { + Height int `json:"height"` +} + +var client HttpDoer = &http.Client{ + Timeout: queryTimeout, +} + +func GetHeight() (int, error) { + r := HeightResponse{} + err := makeRequest(client, "select height from block order by id desc limit 1", &r) + if err != nil { + return 0, errors.Wrap(err, "error retrieving latest height") + } + if len(r.Data) != 1 { + return 0, errors.Errorf("error retrieving latest height, expected %v items in response, got %v", 1, len(r.Data)) + } + return r.Data[0].Height, nil +} + +func makeRequest(client HttpDoer, query string, target any) error { + baseUrl, err := url.Parse(apiUrl) + if err != nil { + return err + } + params := url.Values{} + params.Add("query", query) + baseUrl.RawQuery = params.Encode() + + req, err := http.NewRequest(http.MethodGet, baseUrl.String(), nil) + if err != nil { + return fmt.Errorf("error creating request: %w", err) + } + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("error sending request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: got %v, want %v", resp.StatusCode, http.StatusOK) + } + + err = json.Unmarshal(body, target) + if err != nil { + return err + } + return nil +} diff --git a/pkg/chainquery/chainquery_test.go b/pkg/chainquery/chainquery_test.go new file mode 100644 index 00000000..c1bc6915 --- /dev/null +++ b/pkg/chainquery/chainquery_test.go @@ -0,0 +1,54 @@ +package chainquery + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type failureTestClient struct{} + +func (_ failureTestClient) Do(*http.Request) (*http.Response, error) { + data := HeightResponse{ + Success: true, + Error: nil, + Data: []HeightData{}, + } + + body, _ := json.Marshal(data) + + return &http.Response{ + Status: "200 OK", + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil +} + +func TestGetHeight(t *testing.T) { + require := require.New(t) + assert := assert.New(t) + + height, err := GetHeight() + require.NoError(err) + assert.Greater(height, 1500_000) +} + +func TestGetHeightFailure(t *testing.T) { + assert := assert.New(t) + + origClient := client + client = failureTestClient{} + defer func() { + client = origClient + }() + + height, err := GetHeight() + assert.ErrorContains(err, "error retrieving latest height, expected 1 items in response, got 0") + assert.Equal(0, height) +} diff --git a/pkg/sturdycache/sturdycache.go b/pkg/sturdycache/sturdycache.go index dbba0035..693f782d 100644 --- a/pkg/sturdycache/sturdycache.go +++ b/pkg/sturdycache/sturdycache.go @@ -2,14 +2,19 @@ package sturdycache import ( "context" + "time" + "github.com/dgraph-io/ristretto" "github.com/eko/gocache/lib/v4/cache" "github.com/eko/gocache/lib/v4/store" redis_store "github.com/eko/gocache/store/redis/v4" + ristretto_store "github.com/eko/gocache/store/ristretto/v4" "github.com/redis/go-redis/v9" "golang.org/x/exp/rand" ) +const ReplicatedCacheType = "redis" + type ReplicatedCache struct { masterCache *cache.Cache[any] replicaCaches []*cache.Cache[any] @@ -23,6 +28,7 @@ func NewReplicatedCache( replicaAddrs []string, password string, ) (*ReplicatedCache, error) { + masterClient := redis.NewClient(&redis.Options{ Addr: masterAddr, Password: password, @@ -49,12 +55,29 @@ func NewReplicatedCache( replicaCaches = append(replicaCaches, cache.New[any](replicaStore)) } - cache := &ReplicatedCache{ + baseStore := &ReplicatedCache{ masterCache: masterCache, replicaCaches: replicaCaches, readCaches: append(replicaCaches, masterCache), } + return baseStore, nil +} + +// AddLocalCache adds a local in-memory cache layer to a replicated cache instance. +func AddLocalCache(baseCache *ReplicatedCache) (cache.CacheInterface[any], error) { + // About 50k resolve responses with average size of 10KB + ristrettoCache, err := ristretto.NewCache(&ristretto.Config{NumCounters: 500_000, MaxCost: 500_000_000, BufferItems: 64}) + if err != nil { + return nil, err + } + ristrettoStore := ristretto_store.NewRistretto(ristrettoCache) + + cache := cache.NewChain[any]( + cache.New[any](ristrettoStore), + cache.New[any](baseCache), + ) + return cache, nil } @@ -68,6 +91,11 @@ func (rc *ReplicatedCache) Get(ctx context.Context, key any) (any, error) { return rc.readCaches[rand.Intn(len(rc.readCaches))].Get(ctx, key) } +// Get reads from master and replica caches. +func (rc *ReplicatedCache) GetWithTTL(ctx context.Context, key any) (any, time.Duration, error) { + return rc.readCaches[rand.Intn(len(rc.readCaches))].GetWithTTL(ctx, key) +} + // Invalidate master cache entries for given options. func (rc *ReplicatedCache) Invalidate(ctx context.Context, options ...store.InvalidateOption) error { return rc.masterCache.Invalidate(ctx, options...) @@ -85,5 +113,5 @@ func (rc *ReplicatedCache) Clear(ctx context.Context) error { // GetType returns cache type name. func (rc *ReplicatedCache) GetType() string { - return "replicated_redis" + return ReplicatedCacheType } diff --git a/pkg/sturdycache/sturdycache_test.go b/pkg/sturdycache/sturdycache_test.go index fb72f9b8..b3eb2bc0 100644 --- a/pkg/sturdycache/sturdycache_test.go +++ b/pkg/sturdycache/sturdycache_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/alicebob/miniredis/v2" + "github.com/eko/gocache/lib/v4/cache" "github.com/eko/gocache/lib/v4/store" "github.com/stretchr/testify/suite" ) @@ -16,12 +17,13 @@ import ( type ReplicatedCacheTestSuite struct { suite.Suite - master *miniredis.Miniredis - replicas []*miniredis.Miniredis - cache *ReplicatedCache - teardownFunc teardownFunc - ctx context.Context - cancel context.CancelFunc + master *miniredis.Miniredis + replicas []*miniredis.Miniredis + cache cache.CacheInterface[any] + replicatedCache *ReplicatedCache + teardownFunc teardownFunc + ctx context.Context + cancel context.CancelFunc } type TestStruct struct { @@ -38,12 +40,16 @@ func (t *TestStruct) UnmarshalBinary(data []byte) error { } func (s *ReplicatedCacheTestSuite) SetupTest() { - s.cache, s.master, s.replicas, s.teardownFunc = CreateTestCache(s.T()) + var err error + s.replicatedCache, s.master, s.replicas, s.teardownFunc = CreateTestCache(s.T()) + s.cache, err = AddLocalCache(s.replicatedCache) + s.Require().NoError(err) } func (s *ReplicatedCacheTestSuite) TearDownTest() { s.teardownFunc() } + func (s *ReplicatedCacheTestSuite) SetupSuite() { s.ctx, s.cancel = context.WithTimeout(context.Background(), 30*time.Second) } @@ -54,8 +60,8 @@ func (s *ReplicatedCacheTestSuite) TearDownSuite() { func (s *ReplicatedCacheTestSuite) TestNewReplicatedCache() { s.Require().NotNil(s.cache) - s.Require().NotNil(s.cache.masterCache) - s.Require().Len(s.cache.replicaCaches, len(s.replicas)) + s.Require().NotNil(s.replicatedCache.masterCache) + s.Require().Len(s.replicatedCache.replicaCaches, len(s.replicas)) } func (s *ReplicatedCacheTestSuite) TestSet() {