Skip to content

Commit

Permalink
Add local cache and invalidation
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Nov 28, 2024
1 parent 83aaa11 commit 17a8be7
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 27 deletions.
9 changes: 8 additions & 1 deletion api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,14 @@ func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Prov
if err != nil {
panic(err)
}
cache := query.NewQueryCache(store)
lstore, err := sturdycache.AddLocalCache(store)
if err != nil {
panic(err)
}
cache, err := query.NewQueryCacheWithInvalidator(lstore)
if err != nil {
panic(err)
}
logger.Log().Infof("cache configured: master=%s, replicas=%s", config.GetSturdyCacheMaster(), config.GetSturdyCacheReplicas())

defaultHeaders := []string{
Expand Down
78 changes: 71 additions & 7 deletions app/query/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/OdyseeTeam/odysee-api/internal/monitor"
"github.com/OdyseeTeam/odysee-api/pkg/chainquery"
"github.com/OdyseeTeam/odysee-api/pkg/rpcerrors"

"github.com/eko/gocache/lib/v4/cache"
Expand All @@ -18,6 +19,11 @@ import (
"golang.org/x/sync/singleflight"
)

const (
methodTagSeparator = ":"
invalidationInterval = 15 * time.Second
)

type CacheRequest struct {
Method string
Params any
Expand All @@ -31,16 +37,46 @@ type CachedResponse struct {
type QueryCache struct {
cache *marshaler.Marshaler
singleflight *singleflight.Group
height int
stopChan chan struct{}
}

func NewQueryCache(store cache.CacheInterface[any]) *QueryCache {
marshal := marshaler.New(store)
func NewQueryCache(baseCache cache.CacheInterface[any]) *QueryCache {
marshal := marshaler.New(baseCache)
return &QueryCache{
cache: marshal,
singleflight: &singleflight.Group{},
stopChan: make(chan struct{}),
}
}

func NewQueryCacheWithInvalidator(baseCache cache.CacheInterface[any]) (*QueryCache, error) {
qc := NewQueryCache(baseCache)
height, err := chainquery.GetHeight()
if err != nil {
QueryCacheErrorCount.WithLabelValues(CacheAreaChainquery).Inc()
return nil, fmt.Errorf("failed to get current height: %w", err)
}
qc.height = height

go func() {
ticker := time.NewTicker(invalidationInterval)
for {
select {
case <-ticker.C:
err := qc.runInvalidator()
if err != nil {
logger.Log().Warn(err.Error())
}
case <-qc.stopChan:
return
}
}
}()

return qc, nil
}

func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*CachedResponse, error) {
log := logger.Log()
cacheReq := CacheRequest{
Expand Down Expand Up @@ -72,10 +108,10 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
start := time.Now()
obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter)
if err != nil {
ObserveQueryRetrievalDuration(CacheResultError, cacheReq.Method, start)
ObserveQueryCacheRetrievalDuration(CacheResultError, cacheReq.Method, start)
return nil, fmt.Errorf("error calling getter: %w", err)
}
ObserveQueryRetrievalDuration(CacheResultSuccess, cacheReq.Method, start)
ObserveQueryCacheRetrievalDuration(CacheResultSuccess, cacheReq.Method, start)

res, ok := obj.(*jsonrpc.RPCResponse)
if !ok {
Expand All @@ -94,7 +130,7 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
)
if err != nil {
ObserveQueryCacheOperation(CacheOperationSet, CacheResultError, cacheReq.Method, start)
monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{"method": cacheReq.Method})
monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{ParamMethod: cacheReq.Method})
log.Warnf("error during cache.set: %s", err)
return cacheResp, nil
}
Expand All @@ -103,6 +139,10 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached

return cacheResp, nil
}
if hit == nil {
ObserveQueryCacheOperation(CacheOperationGet, CacheResultError, cacheReq.Method, start)
return nil, nil
}
log.Infof("cache hit for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds())
ObserveQueryCacheOperation(CacheOperationGet, CacheResultHit, cacheReq.Method, start)
cacheResp, ok := hit.(*CachedResponse)
Expand All @@ -112,6 +152,30 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
return cacheResp, nil
}

func (c *QueryCache) runInvalidator() error {
height, err := chainquery.GetHeight()
if err != nil {
QueryCacheErrorCount.WithLabelValues(CacheAreaChainquery).Inc()
return fmt.Errorf("failed to get current height: %w", err)
}
if c.height >= height {
return nil
}
c.height = height

ctx, cancel := context.WithTimeout(context.Background(), invalidationInterval)
defer cancel()
err = c.cache.Invalidate(ctx, store.WithInvalidateTags(
[]string{fmt.Sprintf("%s%s%s", ParamMethod, methodTagSeparator, MethodClaimSearch)},
))
if err != nil {
QueryCacheErrorCount.WithLabelValues(CacheAreaInvalidateCall).Inc()
return fmt.Errorf("failed to invalidate %s entries: %w", MethodClaimSearch, err)
}

return nil
}

func (r CacheRequest) Expiration() time.Duration {
switch r.Method {
case MethodResolve:
Expand All @@ -124,7 +188,7 @@ func (r CacheRequest) Expiration() time.Duration {
}

func (r CacheRequest) Tags() []string {
return []string{"method:" + r.Method}
return []string{fmt.Sprintf("%s%s%s", ParamMethod, methodTagSeparator, r.Method)}
}

func (r CacheRequest) GetCacheKey() string {
Expand All @@ -135,7 +199,7 @@ func (r CacheRequest) GetCacheKey() string {
params = "()"
} else {
if p, err := json.Marshal(r.Params); err != nil {
params = "(x)"
params = "(error)"
} else {
params = string(p)
}
Expand Down
1 change: 0 additions & 1 deletion app/query/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
func TestGetCacheKey(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
assert.Equal(1, 1)
seen := map[string]bool{}
params := []map[string]any{{}, {"uri": "what"}, {"uri": "odysee"}, nil}
genCacheKey := func(params map[string]any) string {
Expand Down
23 changes: 17 additions & 6 deletions app/query/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ const (
CacheResultMiss = "miss"
CacheResultSuccess = "success"
CacheResultError = "error"

CacheAreaChainquery = "chainquery"
CacheAreaInvalidateCall = "invalidate_call"
)

var (
queryRetrievalDurationBuckets = []float64{0.025, 0.05, 0.1, 0.25, 0.4, 1, 2.5, 5, 10, 25, 50, 100, 300}
cacheDurationBuckets = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0}
QueryCacheRetrievalDurationBuckets = []float64{0.025, 0.05, 0.1, 0.25, 0.4, 1, 2.5, 5, 10, 25, 50, 100, 300}
cacheDurationBuckets = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0}

QueryCacheOperationDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -30,21 +33,29 @@ var (
},
[]string{"operation", "result", "method"},
)
QueryRetrievalDuration = promauto.NewHistogramVec(
QueryCacheRetrievalDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "query_cache",
Name: "retrieval_duration_seconds",
Help: "Latency for cold cache retrieval",
Buckets: queryRetrievalDurationBuckets,
Buckets: QueryCacheRetrievalDurationBuckets,
},
[]string{"result", "method"},
)
QueryCacheErrorCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "query_cache",
Name: "error_count",
Help: "Errors unrelated to cache setting/retrieval",
},
[]string{"area"},
)
)

func ObserveQueryCacheOperation(operation, result, method string, start time.Time) {
QueryCacheOperationDuration.WithLabelValues(operation, result, method).Observe(float64(time.Since(start).Seconds()))
}

func ObserveQueryRetrievalDuration(result, method string, start time.Time) {
QueryRetrievalDuration.WithLabelValues(result, method).Observe(float64(time.Since(start).Seconds()))
func ObserveQueryCacheRetrievalDuration(result, method string, start time.Time) {
QueryCacheRetrievalDuration.WithLabelValues(result, method).Observe(float64(time.Since(start).Seconds()))
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ require (
github.com/dgraph-io/ristretto v0.2.0
github.com/eko/gocache/lib/v4 v4.1.6
github.com/eko/gocache/store/redis/v4 v4.2.2
github.com/eko/gocache/store/ristretto/v4 v4.2.2
github.com/friendsofgo/errors v0.9.2
github.com/getsentry/sentry-go v0.13.0
github.com/go-chi/chi/v5 v5.0.8
github.com/go-chi/cors v1.2.1
Expand Down Expand Up @@ -61,6 +63,7 @@ require (
go.uber.org/zap v1.21.0
goa.design/goa/v3 v3.5.2
goa.design/plugins/v3 v3.4.3
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f
golang.org/x/oauth2 v0.6.0
golang.org/x/sync v0.9.0
gopkg.in/vansante/go-ffprobe.v2 v2.1.0
Expand All @@ -79,7 +82,7 @@ require (
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
)

require (
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,8 @@ github.com/eko/gocache/lib/v4 v4.1.6 h1:5WWIGISKhE7mfkyF+SJyWwqa4Dp2mkdX8QsZpnEN
github.com/eko/gocache/lib/v4 v4.1.6/go.mod h1:HFxC8IiG2WeRotg09xEnPD72sCheJiTSr4Li5Ameg7g=
github.com/eko/gocache/store/redis/v4 v4.2.2 h1:Thw31fzGuH3WzJywsdbMivOmP550D6JS7GDHhvCJPA0=
github.com/eko/gocache/store/redis/v4 v4.2.2/go.mod h1:LaTxLKx9TG/YUEybQvPMij++D7PBTIJ4+pzvk0ykz0w=
github.com/eko/gocache/store/ristretto/v4 v4.2.2 h1:lXFzoZ5ck6Gy6ON7f5DHSkNt122qN7KoroCVgVwF7oo=
github.com/eko/gocache/store/ristretto/v4 v4.2.2/go.mod h1:uIvBVJzqRepr5L0RsbkfQ2iYfbyos2fuji/s4yM+aUM=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -872,6 +874,8 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/friendsofgo/errors v0.9.2 h1:X6NYxef4efCBdwI7BgS820zFaN7Cphrmb+Pljdzjtgk=
github.com/friendsofgo/errors v0.9.2/go.mod h1:yCvFW5AkDIL9qn7suHVLiI/gH228n7PC4Pn44IGoTOI=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
Expand Down Expand Up @@ -2504,6 +2508,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
Expand Down
83 changes: 83 additions & 0 deletions pkg/chainquery/chainquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package chainquery

import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/friendsofgo/errors"
)

const (
apiUrl = "https://chainquery.odysee.tv/api/sql"
queryTimeout = 15 * time.Second
)

type HttpDoer interface {
Do(*http.Request) (*http.Response, error)
}

type HeightResponse struct {
Success bool `json:"success"`
Error *string `json:"error"`
Data []HeightData `json:"data"`
}

type HeightData struct {
Height int `json:"height"`
}

var client HttpDoer = &http.Client{
Timeout: queryTimeout,
}

func GetHeight() (int, error) {
r := HeightResponse{}
err := makeRequest(client, "select height from block order by id desc limit 1", &r)
if err != nil {
return 0, errors.Wrap(err, "error retrieving latest height")
}
if len(r.Data) != 1 {
return 0, errors.Errorf("error retrieving latest height, expected %v items in response, got %v", 1, len(r.Data))
}
return r.Data[0].Height, nil
}

func makeRequest(client HttpDoer, query string, target any) error {
baseUrl, err := url.Parse(apiUrl)
if err != nil {
return err
}
params := url.Values{}
params.Add("query", query)
baseUrl.RawQuery = params.Encode()

req, err := http.NewRequest(http.MethodGet, baseUrl.String(), nil)
if err != nil {
return fmt.Errorf("error creating request: %w", err)
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("error sending request: %w", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading response: %w", err)
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: got %v, want %v", resp.StatusCode, http.StatusOK)
}

err = json.Unmarshal(body, target)
if err != nil {
return err
}
return nil
}
Loading

0 comments on commit 17a8be7

Please sign in to comment.