Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve cache #529

Merged
merged 3 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio
legacyProvider := auth.NewIAPIProvider(sdkRouter, config.GetInternalAPIHost())
sentryHandler := sentryhttp.New(sentryhttp.Options{})

allMiddlewares := defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter)

r.Use(methodTimer, sentryHandler.Handle)

r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
Expand All @@ -77,7 +79,7 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio
r.HandleFunc("", emptyHandler)

v1Router := r.PathPrefix("/api/v1").Subrouter()
v1Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
v1Router.Use(allMiddlewares)

v1Router.HandleFunc("/proxy", upHandler.Handle).MatcherFunc(publish.CanHandle)
v1Router.HandleFunc("/proxy", proxy.Handle).Methods(http.MethodPost)
Expand Down Expand Up @@ -108,7 +110,7 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio
}

v2Router := r.PathPrefix("/api/v2").Subrouter()
v2Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
v2Router.Use(allMiddlewares)
status.InstallRoutes(v2Router)

composer := tusd.NewStoreComposer()
Expand Down Expand Up @@ -153,7 +155,7 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio
gpl := zapadapter.NewNamedKV("geopublish", config.GetLoggingOpts())
if opts.EnableV3Publish {
v3Router := r.PathPrefix("/api/v3").Subrouter()
v3Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
v3Router.Use(allMiddlewares)
ug := auth.NewUniversalUserGetter(oauthAuther, legacyProvider, gpl)
gPath := config.GetGeoPublishSourceDir()
v3Handler, err = geopublish.InstallRoutes(v3Router.PathPrefix("/publish").Subrouter(), ug, gPath, "/api/v3/publish/", gpl)
Expand Down Expand Up @@ -208,7 +210,15 @@ func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Prov
if err != nil {
panic(err)
}
cache := query.NewQueryCache(store)
lstore, err := sturdycache.AddLocalCache(store)
if err != nil {
panic(err)
}

cache, err := query.NewQueryCacheWithInvalidator(lstore)
if err != nil {
panic(err)
}
logger.Log().Infof("cache configured: master=%s, replicas=%s", config.GetSturdyCacheMaster(), config.GetSturdyCacheReplicas())

defaultHeaders := []string{
Expand Down
81 changes: 73 additions & 8 deletions app/query/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/OdyseeTeam/odysee-api/internal/monitor"
"github.com/OdyseeTeam/odysee-api/pkg/chainquery"
"github.com/OdyseeTeam/odysee-api/pkg/rpcerrors"

"github.com/eko/gocache/lib/v4/cache"
Expand All @@ -18,6 +19,11 @@ import (
"golang.org/x/sync/singleflight"
)

const (
methodTagSeparator = ":"
invalidationInterval = 15 * time.Second
)

type CacheRequest struct {
Method string
Params any
Expand All @@ -31,14 +37,40 @@ type CachedResponse struct {
type QueryCache struct {
cache *marshaler.Marshaler
singleflight *singleflight.Group
height int
stopChan chan struct{}
}

func NewQueryCache(store cache.CacheInterface[any]) *QueryCache {
marshal := marshaler.New(store)
func NewQueryCache(baseCache cache.CacheInterface[any]) *QueryCache {
marshal := marshaler.New(baseCache)
return &QueryCache{
cache: marshal,
singleflight: &singleflight.Group{},
stopChan: make(chan struct{}),
}
}

func NewQueryCacheWithInvalidator(baseCache cache.CacheInterface[any]) (*QueryCache, error) {
qc := NewQueryCache(baseCache)
height, err := chainquery.GetHeight()
if err != nil {
QueryCacheErrorCount.WithLabelValues(CacheAreaChainquery).Inc()
return nil, fmt.Errorf("failed to get current height: %w", err)
}
qc.height = height
go func() {
ticker := time.NewTicker(invalidationInterval)
for {
select {
case <-ticker.C:
qc.runInvalidator()
case <-qc.stopChan:
return
}
}
}()

return qc, nil
}

func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*CachedResponse, error) {
Expand Down Expand Up @@ -72,10 +104,10 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
start := time.Now()
obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter)
if err != nil {
ObserveQueryRetrievalDuration(CacheResultError, cacheReq.Method, start)
ObserveQueryCacheRetrievalDuration(CacheResultError, cacheReq.Method, start)
return nil, fmt.Errorf("error calling getter: %w", err)
}
ObserveQueryRetrievalDuration(CacheResultSuccess, cacheReq.Method, start)
ObserveQueryCacheRetrievalDuration(CacheResultSuccess, cacheReq.Method, start)

res, ok := obj.(*jsonrpc.RPCResponse)
if !ok {
Expand All @@ -94,7 +126,7 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
)
if err != nil {
ObserveQueryCacheOperation(CacheOperationSet, CacheResultError, cacheReq.Method, start)
monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{"method": cacheReq.Method})
monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{ParamMethod: cacheReq.Method})
log.Warnf("error during cache.set: %s", err)
return cacheResp, nil
}
Expand All @@ -103,6 +135,10 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached

return cacheResp, nil
}
if hit == nil {
ObserveQueryCacheOperation(CacheOperationGet, CacheResultError, cacheReq.Method, start)
return nil, nil
}
log.Infof("cache hit for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds())
ObserveQueryCacheOperation(CacheOperationGet, CacheResultHit, cacheReq.Method, start)
cacheResp, ok := hit.(*CachedResponse)
Expand All @@ -112,19 +148,48 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
return cacheResp, nil
}

func (c *QueryCache) runInvalidator() error {
log := logger.Log()
height, err := chainquery.GetHeight()
if err != nil {
QueryCacheErrorCount.WithLabelValues(CacheAreaChainquery).Inc()
return fmt.Errorf("failed to get current height: %w", err)
}
if c.height >= height {
log.Infof("block height unchanged (%v = %v), cache invalidation skipped", height, c.height)
return nil
}

log.Infof("new block height (%v > %v), running invalidation", height, c.height)
c.height = height

ctx, cancel := context.WithTimeout(context.Background(), invalidationInterval)
defer cancel()
err = c.cache.Invalidate(ctx, store.WithInvalidateTags(
[]string{fmt.Sprintf("%s%s%s", ParamMethod, methodTagSeparator, MethodClaimSearch)},
))
if err != nil {
QueryCacheErrorCount.WithLabelValues(CacheAreaInvalidateCall).Inc()
log.Warnf("failed to invalidate %s entries: %s", MethodClaimSearch, err)
return fmt.Errorf("failed to invalidate %s entries: %w", MethodClaimSearch, err)
}

return nil
}

func (r CacheRequest) Expiration() time.Duration {
switch r.Method {
case MethodResolve:
return 600 * time.Second
case MethodClaimSearch:
return 180 * time.Second
return 300 * time.Second
default:
return 60 * time.Second
}
}

func (r CacheRequest) Tags() []string {
return []string{"method:" + r.Method}
return []string{fmt.Sprintf("%s%s%s", ParamMethod, methodTagSeparator, r.Method)}
}

func (r CacheRequest) GetCacheKey() string {
Expand All @@ -135,7 +200,7 @@ func (r CacheRequest) GetCacheKey() string {
params = "()"
} else {
if p, err := json.Marshal(r.Params); err != nil {
params = "(x)"
params = "(error)"
} else {
params = string(p)
}
Expand Down
1 change: 0 additions & 1 deletion app/query/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
func TestGetCacheKey(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
assert.Equal(1, 1)
seen := map[string]bool{}
params := []map[string]any{{}, {"uri": "what"}, {"uri": "odysee"}, nil}
genCacheKey := func(params map[string]any) string {
Expand Down
2 changes: 1 addition & 1 deletion app/query/caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP
return nil, rpcerrors.NewSDKError(err)
}
if res != nil {
logger.Log().Infof("got %s response from %s hook", q.Method(), hook.name)
logger.Log().Debugf("got %s response from %s hook", q.Method(), hook.name)
return res, nil
}
}
Expand Down
Loading
Loading