Skip to content

Commit

Permalink
Merge pull request #525 from OdyseeTeam/improve-redis-cache
Browse files Browse the repository at this point in the history
Implement redis cache
  • Loading branch information
anbsky authored Nov 20, 2024
2 parents cc94100 + 2c04acf commit 5180922
Show file tree
Hide file tree
Showing 29 changed files with 832 additions and 165 deletions.
14 changes: 11 additions & 3 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
gpmetrics "github.com/OdyseeTeam/odysee-api/app/geopublish/metrics"
"github.com/OdyseeTeam/odysee-api/app/proxy"
"github.com/OdyseeTeam/odysee-api/app/publish"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
"github.com/OdyseeTeam/odysee-api/app/query"
"github.com/OdyseeTeam/odysee-api/app/sdkrouter"
"github.com/OdyseeTeam/odysee-api/app/wallet"
"github.com/OdyseeTeam/odysee-api/apps/lbrytv/config"
Expand All @@ -26,6 +26,7 @@ import (
"github.com/OdyseeTeam/odysee-api/pkg/keybox"
"github.com/OdyseeTeam/odysee-api/pkg/logging/zapadapter"
"github.com/OdyseeTeam/odysee-api/pkg/redislocker"
"github.com/OdyseeTeam/odysee-api/pkg/sturdycache"
"github.com/OdyseeTeam/player-server/pkg/paid"

sentryhttp "github.com/getsentry/sentry-go/http"
Expand Down Expand Up @@ -199,10 +200,17 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio
}

func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Provider, router *sdkrouter.Router) mux.MiddlewareFunc {
queryCache, err := cache.New(cache.DefaultConfig())
store, err := sturdycache.NewReplicatedCache(
config.GetSturdyCacheMaster(),
config.GetSturdyCacheReplicas(),
config.GetSturdyCachePassword(),
)
if err != nil {
panic(err)
}
cache := query.NewQueryCache(store)
logger.Log().Infof("cache configured: master=%s, replicas=%s", config.GetSturdyCacheMaster(), config.GetSturdyCacheReplicas())

defaultHeaders := []string{
wallet.LegacyTokenHeader, wallet.AuthorizationHeader, "X-Requested-With", "Content-Type", "Accept",
}
Expand All @@ -222,7 +230,7 @@ func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Prov
sdkrouter.Middleware(router),
auth.Middleware(oauthAuther), // Will pass forward user/error to next
auth.LegacyMiddleware(legacyProvider),
cache.Middleware(queryCache),
query.CacheMiddleware(cache),
)
}

Expand Down
15 changes: 0 additions & 15 deletions app/geopublish/geopublish.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package geopublish

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -14,7 +13,6 @@ import (
"github.com/OdyseeTeam/odysee-api/app/geopublish/forklift"
"github.com/OdyseeTeam/odysee-api/app/proxy"
"github.com/OdyseeTeam/odysee-api/app/query"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
"github.com/OdyseeTeam/odysee-api/app/sdkrouter"
"github.com/OdyseeTeam/odysee-api/internal/errors"
"github.com/OdyseeTeam/odysee-api/internal/metrics"
Expand Down Expand Up @@ -422,19 +420,6 @@ func (h *Handler) getUserFromRequest(r *http.Request) (*models.User, error) {
return h.options.userGetter.FromRequest(r)
}

func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *query.Caller {
c := query.NewCaller(sdkAddress, userID)
c.Cache = qCache
c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
q := query.QueryFromContext(ctx)
params := q.ParamsAsMap()
params[fileNameParam] = filename
q.Request.Params = params
return nil, nil
}, "")
return c
}

// observeFailure requires metrics.MeasureMiddleware middleware to be present on the request
func observeFailure(d float64, kind string) {
metrics.ProxyE2ECallDurations.WithLabelValues(method).Observe(d)
Expand Down
15 changes: 6 additions & 9 deletions app/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"strings"

"github.com/OdyseeTeam/odysee-api/app/auth"
"github.com/OdyseeTeam/odysee-api/app/query"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
"github.com/OdyseeTeam/odysee-api/app/sdkrouter"
"github.com/OdyseeTeam/odysee-api/internal/audit"
"github.com/OdyseeTeam/odysee-api/internal/errors"
Expand Down Expand Up @@ -72,7 +71,7 @@ func Handle(w http.ResponseWriter, r *http.Request) {
return
}

body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
writeResponse(w, rpcerrors.NewJSONParseError(errors.Err("error reading request body")).JSON())
Expand Down Expand Up @@ -117,10 +116,6 @@ func Handle(w http.ResponseWriter, r *http.Request) {
sdkAddress = rt.RandomServer().Address
}

var qCache *cache.Cache
if cache.IsOnRequest(r) {
qCache = cache.FromRequest(r)
}
c := query.NewCaller(sdkAddress, userID)

remoteIP := ip.FromRequest(r)
Expand All @@ -134,7 +129,9 @@ func Handle(w http.ResponseWriter, r *http.Request) {
return nil, nil
}, "")

c.Cache = qCache
if query.HasCache(r) {
c.Cache = query.CacheFromRequest(r)
}

rpcRes, err := c.Call(query.AttachOrigin(r.Context(), origin), rpcReq)

Expand All @@ -152,7 +149,7 @@ func Handle(w http.ResponseWriter, r *http.Request) {
}
monitor.ErrorToSentry(err, map[string]string{"request": fmt.Sprintf("%+v", rpcReq), "response": fmt.Sprintf("%+v", rpcRes)})
observeFailure(metrics.GetDuration(r), rpcReq.Method, metrics.FailureKindNet)
logger.Log().Errorf("error calling lbrynet: %v, request: %+v", err, rpcReq)
logger.Log().Errorf("error calling sdk method %s: %s", rpcReq.Method, err)
return
}

Expand Down
9 changes: 4 additions & 5 deletions app/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/OdyseeTeam/odysee-api/app/auth"
"github.com/OdyseeTeam/odysee-api/app/proxy"
"github.com/OdyseeTeam/odysee-api/app/query"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
"github.com/OdyseeTeam/odysee-api/app/sdkrouter"
"github.com/OdyseeTeam/odysee-api/internal/errors"
"github.com/OdyseeTeam/odysee-api/internal/metrics"
Expand Down Expand Up @@ -160,9 +159,9 @@ retry:
}
}()

var qCache *cache.Cache
if cache.IsOnRequest(r) {
qCache = cache.FromRequest(r)
var qCache *query.QueryCache
if query.HasCache(r) {
qCache = query.CacheFromRequest(r)
}

var rpcReq *jsonrpc.RPCRequest
Expand Down Expand Up @@ -223,7 +222,7 @@ retry:
observeSuccess(metrics.GetDuration(r))
}

func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *query.Caller {
func getCaller(sdkAddress, filename string, userID int, qCache *query.QueryCache) *query.Caller {
c := query.NewCaller(sdkAddress, userID)
c.Cache = qCache
c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
Expand Down
9 changes: 4 additions & 5 deletions app/publish/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/OdyseeTeam/odysee-api/app/auth"
"github.com/OdyseeTeam/odysee-api/app/proxy"
"github.com/OdyseeTeam/odysee-api/app/query"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
"github.com/OdyseeTeam/odysee-api/app/sdkrouter"
"github.com/OdyseeTeam/odysee-api/app/wallet"
"github.com/OdyseeTeam/odysee-api/internal/errors"
Expand Down Expand Up @@ -263,10 +262,10 @@ func (h TusHandler) Notify(w http.ResponseWriter, r *http.Request) {
return
}

// upload is completed, notify it to lbrynet server
var qCache *cache.Cache
if cache.IsOnRequest(r) {
qCache = cache.FromRequest(r)
// upload is completed, notify lbrynet server
var qCache *query.QueryCache
if query.HasCache(r) {
qCache = query.CacheFromRequest(r)
}

var rpcReq *jsonrpc.RPCRequest
Expand Down
179 changes: 179 additions & 0 deletions app/query/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package query

import (
"context"
"crypto"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/OdyseeTeam/odysee-api/internal/monitor"
"github.com/OdyseeTeam/odysee-api/pkg/rpcerrors"

"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/marshaler"
"github.com/eko/gocache/lib/v4/store"
"github.com/ybbus/jsonrpc"
"golang.org/x/sync/singleflight"
)

type CacheRequest struct {
Method string
Params any
}

type CachedResponse struct {
Result any
Error *jsonrpc.RPCError
}

type QueryCache struct {
cache *marshaler.Marshaler
singleflight *singleflight.Group
}

func NewQueryCache(store cache.CacheInterface[any]) *QueryCache {
marshal := marshaler.New(store)
return &QueryCache{
cache: marshal,
singleflight: &singleflight.Group{},
}
}

func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*CachedResponse, error) {
log := logger.Log()
cacheReq := CacheRequest{
Method: query.Method(),
Params: query.Params(),
}

ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()

start := time.Now()

hit, err := c.cache.Get(ctx, cacheReq, &CachedResponse{})
if err != nil {
if !errors.Is(err, &store.NotFound{}) {
ObserveQueryCacheOperation(CacheOperationGet, CacheResultError, cacheReq.Method, start)
return nil, fmt.Errorf("error during cache.get: %w", err)
}

ObserveQueryCacheOperation(CacheOperationGet, CacheResultMiss, cacheReq.Method, start)

if getter == nil {
log.Warnf("nil getter provided for %s", cacheReq.Method)
return nil, nil
}

log.Infof("cache miss for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds())
// Cold object retrieval after cache miss
start := time.Now()
obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter)
if err != nil {
ObserveQueryRetrievalDuration(CacheResultError, cacheReq.Method, start)
return nil, fmt.Errorf("error calling getter: %w", err)
}
ObserveQueryRetrievalDuration(CacheResultSuccess, cacheReq.Method, start)

res, ok := obj.(*jsonrpc.RPCResponse)
if !ok {
return nil, errors.New("unknown type returned by getter")
}

cacheResp := &CachedResponse{Result: res.Result, Error: res.Error}
if res.Error != nil {
log.Debugf("rpc error received (%s), not caching", cacheReq.Method)
} else {
start := time.Now()
err = c.cache.Set(
ctx, cacheReq, cacheResp,
store.WithExpiration(cacheReq.Expiration()),
store.WithTags(cacheReq.Tags()),
)
if err != nil {
ObserveQueryCacheOperation(CacheOperationSet, CacheResultError, cacheReq.Method, start)
monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{"method": cacheReq.Method})
log.Warnf("error during cache.set: %s", err)
return cacheResp, nil
}
ObserveQueryCacheOperation(CacheOperationSet, CacheResultSuccess, cacheReq.Method, start)
}

return cacheResp, nil
}
log.Infof("cache hit for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds())
ObserveQueryCacheOperation(CacheOperationGet, CacheResultHit, cacheReq.Method, start)
cacheResp, ok := hit.(*CachedResponse)
if !ok {
return nil, errors.New("unknown cache object retrieved")
}
return cacheResp, nil
}

func (r CacheRequest) Expiration() time.Duration {
switch r.Method {
case MethodResolve:
return 600 * time.Second
case MethodClaimSearch:
return 180 * time.Second
default:
return 60 * time.Second
}
}

func (r CacheRequest) Tags() []string {
return []string{"method:" + r.Method}
}

func (r CacheRequest) GetCacheKey() string {
digester := crypto.MD5.New()
var params string

if r.Params == nil {
params = "()"
} else {
if p, err := json.Marshal(r.Params); err != nil {
params = "(x)"
} else {
params = string(p)
}
}
fmt.Fprintf(digester, "%s:%s:%s", "request", r.Method, params)
hash := digester.Sum(nil)
return fmt.Sprintf("%x", hash)
}

func (r *CachedResponse) RPCResponse(id int) *jsonrpc.RPCResponse {
return &jsonrpc.RPCResponse{
JSONRPC: "2.0",
Result: r.Result,
Error: r.Error,
ID: id,
}
}

func (r *CachedResponse) MarshalBinary() ([]byte, error) {
return json.Marshal(r)
}

func (r *CachedResponse) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, r)
}

func preflightCacheHook(caller *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
log := logger.Log()
if caller.Cache == nil {
log.Warn("no cache present on caller")
return nil, nil
}
query := QueryFromContext(ctx)
cachedResp, err := caller.Cache.Retrieve(query, func() (any, error) {
return caller.SendQuery(ctx, query)
})
if err != nil {
return nil, rpcerrors.NewSDKError(err)
}
return cachedResp.RPCResponse(query.Request.ID), nil
}
2 changes: 1 addition & 1 deletion app/query/cache/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

const ContextKey = "cache"

func IsOnRequest(r *http.Request) bool {
func HasCache(r *http.Request) bool {
return r.Context().Value(ContextKey) != nil
}

Expand Down
Loading

0 comments on commit 5180922

Please sign in to comment.