diff --git a/api/routes.go b/api/routes.go index b9647e87..585a823f 100644 --- a/api/routes.go +++ b/api/routes.go @@ -13,7 +13,7 @@ import ( gpmetrics "github.com/OdyseeTeam/odysee-api/app/geopublish/metrics" "github.com/OdyseeTeam/odysee-api/app/proxy" "github.com/OdyseeTeam/odysee-api/app/publish" - "github.com/OdyseeTeam/odysee-api/app/query/cache" + "github.com/OdyseeTeam/odysee-api/app/query" "github.com/OdyseeTeam/odysee-api/app/sdkrouter" "github.com/OdyseeTeam/odysee-api/app/wallet" "github.com/OdyseeTeam/odysee-api/apps/lbrytv/config" @@ -26,6 +26,7 @@ import ( "github.com/OdyseeTeam/odysee-api/pkg/keybox" "github.com/OdyseeTeam/odysee-api/pkg/logging/zapadapter" "github.com/OdyseeTeam/odysee-api/pkg/redislocker" + "github.com/OdyseeTeam/odysee-api/pkg/sturdycache" "github.com/OdyseeTeam/player-server/pkg/paid" sentryhttp "github.com/getsentry/sentry-go/http" @@ -199,10 +200,17 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio } func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Provider, router *sdkrouter.Router) mux.MiddlewareFunc { - queryCache, err := cache.New(cache.DefaultConfig()) + store, err := sturdycache.NewReplicatedCache( + config.GetSturdyCacheMaster(), + config.GetSturdyCacheReplicas(), + config.GetSturdyCachePassword(), + ) if err != nil { panic(err) } + cache := query.NewQueryCache(store) + logger.Log().Infof("cache configured: master=%s, replicas=%s", config.GetSturdyCacheMaster(), config.GetSturdyCacheReplicas()) + defaultHeaders := []string{ wallet.LegacyTokenHeader, wallet.AuthorizationHeader, "X-Requested-With", "Content-Type", "Accept", } @@ -222,7 +230,7 @@ func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Prov sdkrouter.Middleware(router), auth.Middleware(oauthAuther), // Will pass forward user/error to next auth.LegacyMiddleware(legacyProvider), - cache.Middleware(queryCache), + query.CacheMiddleware(cache), ) } diff --git a/app/geopublish/geopublish.go b/app/geopublish/geopublish.go index a0f9a66a..8bccb98f 100644 --- a/app/geopublish/geopublish.go +++ b/app/geopublish/geopublish.go @@ -1,7 +1,6 @@ package geopublish import ( - "context" "database/sql" "encoding/json" "fmt" @@ -14,7 +13,6 @@ import ( "github.com/OdyseeTeam/odysee-api/app/geopublish/forklift" "github.com/OdyseeTeam/odysee-api/app/proxy" "github.com/OdyseeTeam/odysee-api/app/query" - "github.com/OdyseeTeam/odysee-api/app/query/cache" "github.com/OdyseeTeam/odysee-api/app/sdkrouter" "github.com/OdyseeTeam/odysee-api/internal/errors" "github.com/OdyseeTeam/odysee-api/internal/metrics" @@ -422,19 +420,6 @@ func (h *Handler) getUserFromRequest(r *http.Request) (*models.User, error) { return h.options.userGetter.FromRequest(r) } -func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *query.Caller { - c := query.NewCaller(sdkAddress, userID) - c.Cache = qCache - c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { - q := query.QueryFromContext(ctx) - params := q.ParamsAsMap() - params[fileNameParam] = filename - q.Request.Params = params - return nil, nil - }, "") - return c -} - // observeFailure requires metrics.MeasureMiddleware middleware to be present on the request func observeFailure(d float64, kind string) { metrics.ProxyE2ECallDurations.WithLabelValues(method).Observe(d) diff --git a/app/proxy/proxy.go b/app/proxy/proxy.go index 5b880880..83d40d1f 100644 --- a/app/proxy/proxy.go +++ b/app/proxy/proxy.go @@ -10,13 +10,12 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "strings" "github.com/OdyseeTeam/odysee-api/app/auth" "github.com/OdyseeTeam/odysee-api/app/query" - "github.com/OdyseeTeam/odysee-api/app/query/cache" "github.com/OdyseeTeam/odysee-api/app/sdkrouter" "github.com/OdyseeTeam/odysee-api/internal/audit" "github.com/OdyseeTeam/odysee-api/internal/errors" @@ -72,7 +71,7 @@ func Handle(w http.ResponseWriter, r *http.Request) { return } - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) writeResponse(w, rpcerrors.NewJSONParseError(errors.Err("error reading request body")).JSON()) @@ -117,10 +116,6 @@ func Handle(w http.ResponseWriter, r *http.Request) { sdkAddress = rt.RandomServer().Address } - var qCache *cache.Cache - if cache.IsOnRequest(r) { - qCache = cache.FromRequest(r) - } c := query.NewCaller(sdkAddress, userID) remoteIP := ip.FromRequest(r) @@ -134,7 +129,9 @@ func Handle(w http.ResponseWriter, r *http.Request) { return nil, nil }, "") - c.Cache = qCache + if query.HasCache(r) { + c.Cache = query.CacheFromRequest(r) + } rpcRes, err := c.Call(query.AttachOrigin(r.Context(), origin), rpcReq) @@ -152,7 +149,7 @@ func Handle(w http.ResponseWriter, r *http.Request) { } monitor.ErrorToSentry(err, map[string]string{"request": fmt.Sprintf("%+v", rpcReq), "response": fmt.Sprintf("%+v", rpcRes)}) observeFailure(metrics.GetDuration(r), rpcReq.Method, metrics.FailureKindNet) - logger.Log().Errorf("error calling lbrynet: %v, request: %+v", err, rpcReq) + logger.Log().Errorf("error calling sdk method %s: %s", rpcReq.Method, err) return } diff --git a/app/publish/publish.go b/app/publish/publish.go index cb700dc1..f1ec85e4 100644 --- a/app/publish/publish.go +++ b/app/publish/publish.go @@ -14,7 +14,6 @@ import ( "github.com/OdyseeTeam/odysee-api/app/auth" "github.com/OdyseeTeam/odysee-api/app/proxy" "github.com/OdyseeTeam/odysee-api/app/query" - "github.com/OdyseeTeam/odysee-api/app/query/cache" "github.com/OdyseeTeam/odysee-api/app/sdkrouter" "github.com/OdyseeTeam/odysee-api/internal/errors" "github.com/OdyseeTeam/odysee-api/internal/metrics" @@ -160,9 +159,9 @@ retry: } }() - var qCache *cache.Cache - if cache.IsOnRequest(r) { - qCache = cache.FromRequest(r) + var qCache *query.QueryCache + if query.HasCache(r) { + qCache = query.CacheFromRequest(r) } var rpcReq *jsonrpc.RPCRequest @@ -223,7 +222,7 @@ retry: observeSuccess(metrics.GetDuration(r)) } -func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *query.Caller { +func getCaller(sdkAddress, filename string, userID int, qCache *query.QueryCache) *query.Caller { c := query.NewCaller(sdkAddress, userID) c.Cache = qCache c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { diff --git a/app/publish/tus.go b/app/publish/tus.go index 337f8855..cb90a3d4 100644 --- a/app/publish/tus.go +++ b/app/publish/tus.go @@ -12,7 +12,6 @@ import ( "github.com/OdyseeTeam/odysee-api/app/auth" "github.com/OdyseeTeam/odysee-api/app/proxy" "github.com/OdyseeTeam/odysee-api/app/query" - "github.com/OdyseeTeam/odysee-api/app/query/cache" "github.com/OdyseeTeam/odysee-api/app/sdkrouter" "github.com/OdyseeTeam/odysee-api/app/wallet" "github.com/OdyseeTeam/odysee-api/internal/errors" @@ -263,10 +262,10 @@ func (h TusHandler) Notify(w http.ResponseWriter, r *http.Request) { return } - // upload is completed, notify it to lbrynet server - var qCache *cache.Cache - if cache.IsOnRequest(r) { - qCache = cache.FromRequest(r) + // upload is completed, notify lbrynet server + var qCache *query.QueryCache + if query.HasCache(r) { + qCache = query.CacheFromRequest(r) } var rpcReq *jsonrpc.RPCRequest diff --git a/app/query/cache.go b/app/query/cache.go new file mode 100644 index 00000000..ff569595 --- /dev/null +++ b/app/query/cache.go @@ -0,0 +1,179 @@ +package query + +import ( + "context" + "crypto" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/OdyseeTeam/odysee-api/internal/monitor" + "github.com/OdyseeTeam/odysee-api/pkg/rpcerrors" + + "github.com/eko/gocache/lib/v4/cache" + "github.com/eko/gocache/lib/v4/marshaler" + "github.com/eko/gocache/lib/v4/store" + "github.com/ybbus/jsonrpc" + "golang.org/x/sync/singleflight" +) + +type CacheRequest struct { + Method string + Params any +} + +type CachedResponse struct { + Result any + Error *jsonrpc.RPCError +} + +type QueryCache struct { + cache *marshaler.Marshaler + singleflight *singleflight.Group +} + +func NewQueryCache(store cache.CacheInterface[any]) *QueryCache { + marshal := marshaler.New(store) + return &QueryCache{ + cache: marshal, + singleflight: &singleflight.Group{}, + } +} + +func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*CachedResponse, error) { + log := logger.Log() + cacheReq := CacheRequest{ + Method: query.Method(), + Params: query.Params(), + } + + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + start := time.Now() + + hit, err := c.cache.Get(ctx, cacheReq, &CachedResponse{}) + if err != nil { + if !errors.Is(err, &store.NotFound{}) { + ObserveQueryCacheOperation(CacheOperationGet, CacheResultError, cacheReq.Method, start) + return nil, fmt.Errorf("error during cache.get: %w", err) + } + + ObserveQueryCacheOperation(CacheOperationGet, CacheResultMiss, cacheReq.Method, start) + + if getter == nil { + log.Warnf("nil getter provided for %s", cacheReq.Method) + return nil, nil + } + + log.Infof("cache miss for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds()) + // Cold object retrieval after cache miss + start := time.Now() + obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter) + if err != nil { + ObserveQueryRetrievalDuration(CacheResultError, cacheReq.Method, start) + return nil, fmt.Errorf("error calling getter: %w", err) + } + ObserveQueryRetrievalDuration(CacheResultSuccess, cacheReq.Method, start) + + res, ok := obj.(*jsonrpc.RPCResponse) + if !ok { + return nil, errors.New("unknown type returned by getter") + } + + cacheResp := &CachedResponse{Result: res.Result, Error: res.Error} + if res.Error != nil { + log.Debugf("rpc error received (%s), not caching", cacheReq.Method) + } else { + start := time.Now() + err = c.cache.Set( + ctx, cacheReq, cacheResp, + store.WithExpiration(cacheReq.Expiration()), + store.WithTags(cacheReq.Tags()), + ) + if err != nil { + ObserveQueryCacheOperation(CacheOperationSet, CacheResultError, cacheReq.Method, start) + monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{"method": cacheReq.Method}) + log.Warnf("error during cache.set: %s", err) + return cacheResp, nil + } + ObserveQueryCacheOperation(CacheOperationSet, CacheResultSuccess, cacheReq.Method, start) + } + + return cacheResp, nil + } + log.Infof("cache hit for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds()) + ObserveQueryCacheOperation(CacheOperationGet, CacheResultHit, cacheReq.Method, start) + cacheResp, ok := hit.(*CachedResponse) + if !ok { + return nil, errors.New("unknown cache object retrieved") + } + return cacheResp, nil +} + +func (r CacheRequest) Expiration() time.Duration { + switch r.Method { + case MethodResolve: + return 600 * time.Second + case MethodClaimSearch: + return 180 * time.Second + default: + return 60 * time.Second + } +} + +func (r CacheRequest) Tags() []string { + return []string{"method:" + r.Method} +} + +func (r CacheRequest) GetCacheKey() string { + digester := crypto.MD5.New() + var params string + + if r.Params == nil { + params = "()" + } else { + if p, err := json.Marshal(r.Params); err != nil { + params = "(x)" + } else { + params = string(p) + } + } + fmt.Fprintf(digester, "%s:%s:%s", "request", r.Method, params) + hash := digester.Sum(nil) + return fmt.Sprintf("%x", hash) +} + +func (r *CachedResponse) RPCResponse(id int) *jsonrpc.RPCResponse { + return &jsonrpc.RPCResponse{ + JSONRPC: "2.0", + Result: r.Result, + Error: r.Error, + ID: id, + } +} + +func (r *CachedResponse) MarshalBinary() ([]byte, error) { + return json.Marshal(r) +} + +func (r *CachedResponse) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, r) +} + +func preflightCacheHook(caller *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { + log := logger.Log() + if caller.Cache == nil { + log.Warn("no cache present on caller") + return nil, nil + } + query := QueryFromContext(ctx) + cachedResp, err := caller.Cache.Retrieve(query, func() (any, error) { + return caller.SendQuery(ctx, query) + }) + if err != nil { + return nil, rpcerrors.NewSDKError(err) + } + return cachedResp.RPCResponse(query.Request.ID), nil +} diff --git a/app/query/cache/middleware.go b/app/query/cache/middleware.go index 2f1ba5e8..58409ff4 100644 --- a/app/query/cache/middleware.go +++ b/app/query/cache/middleware.go @@ -9,7 +9,7 @@ import ( const ContextKey = "cache" -func IsOnRequest(r *http.Request) bool { +func HasCache(r *http.Request) bool { return r.Context().Value(ContextKey) != nil } diff --git a/app/query/cache_test.go b/app/query/cache_test.go new file mode 100644 index 00000000..d1621692 --- /dev/null +++ b/app/query/cache_test.go @@ -0,0 +1,37 @@ +package query + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/ybbus/jsonrpc" +) + +func TestGetCacheKey(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + assert.Equal(1, 1) + seen := map[string]bool{} + params := []map[string]any{{}, {"uri": "what"}, {"uri": "odysee"}, nil} + genCacheKey := func(params map[string]any) string { + req := jsonrpc.NewRequest(MethodResolve, params) + query, err := NewQuery(req, "") + require.NoError(err) + cacheReq := CacheRequest{ + Method: query.Method(), + Params: query.Params(), + } + return cacheReq.GetCacheKey() + } + for _, p := range params { + t.Run(fmt.Sprintf("%+v", p), func(t *testing.T) { + cacheKey := genCacheKey(p) + assert.Len(cacheKey, 32) + assert.NotContains(seen, cacheKey) + seen[cacheKey] = true + }) + } + assert.Contains(seen, genCacheKey(params[1])) +} diff --git a/app/query/caller.go b/app/query/caller.go index 02c439d1..d90d7dbd 100644 --- a/app/query/caller.go +++ b/app/query/caller.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/OdyseeTeam/odysee-api/app/query/cache" "github.com/OdyseeTeam/odysee-api/app/sdkrouter" "github.com/OdyseeTeam/odysee-api/app/wallet" "github.com/OdyseeTeam/odysee-api/apps/lbrytv/config" @@ -58,7 +57,7 @@ type Caller struct { postflightHooks []hookEntry // Cache stores cacheable queries to improve performance - Cache *cache.Cache + Cache *QueryCache Duration float64 @@ -110,7 +109,7 @@ func (c *Caller) getRPCClient(method string) jsonrpc.RPCClient { // to JSON-RPC server altogether. func (c *Caller) AddPreflightHook(method string, hf Hook, name string) { c.preflightHooks = append(c.preflightHooks, hookEntry{method, hf, name}) - logger.Log().Debugf("added a preflight hook for method %v", method) + logger.Log().Debugf("added a preflight hook for method %s, %s", method, name) } // AddPostflightHook adds query postflight hook function, @@ -118,7 +117,7 @@ func (c *Caller) AddPreflightHook(method string, hf Hook, name string) { // or to modify log entry fields. func (c *Caller) AddPostflightHook(method string, hf Hook, name string) { c.postflightHooks = append(c.postflightHooks, hookEntry{method, hf, name}) - logger.Log().Debugf("added a postflight hook for method %v", method) + logger.Log().Debugf("added a postflight hook for method %s, %s", method, name) } func (c *Caller) addDefaultHooks() { @@ -130,8 +129,13 @@ func (c *Caller) addDefaultHooks() { c.AddPostflightHook(MethodClaimSearch, postClaimSearchArfleetThumbs, builtinHookName) c.AddPostflightHook(MethodResolve, postResolveArfleetThumbs, builtinHookName) } + + // This should be applied after all preflight hooks had a chance + c.AddPreflightHook(MethodResolve, preflightCacheHook, "cache") + c.AddPreflightHook(MethodClaimSearch, preflightCacheHook, "cache") } +// CloneWithoutHook is for testing and debugging purposes. func (c *Caller) CloneWithoutHook(endpoint, method, name string) *Caller { cc := NewCaller(endpoint, c.userID) for _, h := range c.postflightHooks { @@ -153,8 +157,8 @@ func (c *Caller) Endpoint() string { return c.endpoint } -// Call method forwards a JSON-RPC request to the lbrynet server. -// It returns a response that is ready to be sent back to the JSON-RPC client as is. +// Call method takes JSON-RPC request through a set of hooks and forwards it to lbrynet server. +// It returns a response that is ready to be sent back to the JSON-RPC client. func (c *Caller) Call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) { origin := OriginFromContext(ctx) metrics.ProxyCallCounter.WithLabelValues(req.Method, c.Endpoint(), origin).Inc() @@ -167,8 +171,6 @@ func (c *Caller) Call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP return res, err } -// Call method forwards a JSON-RPC request to the lbrynet server. -// It returns a response that is ready to be sent back to the JSON-RPC client as is. func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) { if c.endpoint == "" { return nil, errors.Err("cannot call blank endpoint") @@ -184,7 +186,7 @@ func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP return nil, err } - // Applying preflight hooks + // Applying preflight hooks, if any one of them returns, this will be returned as response var res *jsonrpc.RPCResponse ctx = AttachQuery(ctx, q) for _, hook := range c.preflightHooks { @@ -194,34 +196,16 @@ func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP return nil, rpcerrors.NewSDKError(err) } if res != nil { + logger.Log().Infof("got %s response from %s hook", q.Method(), hook.name) return res, nil } } } - if res == nil { - // Attempt to retrieve the result from cache, retrieving and setting it if it's missing, - // and only send the query directly if it's still missing after the cache call somehow. - var ires interface{} - retriever := func() (interface{}, error) { return c.SendQuery(ctx, q) } - if q.IsCacheable() && c.Cache != nil { - ires, err = c.Cache.Retrieve(q.Method(), q.Params(), retriever) - if err != nil { - return nil, rpcerrors.NewSDKError(err) - } - res, _ = ires.(*jsonrpc.RPCResponse) - } - if res == nil { - res, err = c.SendQuery(ctx, q) - } - if err != nil { - return nil, rpcerrors.NewSDKError(err) - } - } - - return res, nil + return c.SendQuery(ctx, q) } +// SendQuery is where the actual RPC call happens, bypassing all hooks and retrying in case of "wallet not loaded" errors. func (c *Caller) SendQuery(ctx context.Context, q *Query) (*jsonrpc.RPCResponse, error) { var ( r *jsonrpc.RPCResponse diff --git a/app/query/caller_test.go b/app/query/caller_test.go index 8b43d3a9..88cb57e2 100644 --- a/app/query/caller_test.go +++ b/app/query/caller_test.go @@ -8,18 +8,18 @@ import ( "testing" "time" - "github.com/OdyseeTeam/odysee-api/app/query/cache" "github.com/OdyseeTeam/odysee-api/app/sdkrouter" "github.com/OdyseeTeam/odysee-api/app/wallet" "github.com/OdyseeTeam/odysee-api/apps/lbrytv/config" "github.com/OdyseeTeam/odysee-api/internal/errors" "github.com/OdyseeTeam/odysee-api/internal/test" "github.com/OdyseeTeam/odysee-api/pkg/rpcerrors" + "github.com/OdyseeTeam/odysee-api/pkg/sturdycache" "github.com/OdyseeTeam/player-server/pkg/paid" - "github.com/Pallinder/go-randomdata" ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc" + "github.com/Pallinder/go-randomdata" "github.com/sirupsen/logrus" logrusTest "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" @@ -381,38 +381,40 @@ func TestCaller_CloneWithoutHook(t *testing.T) { func TestCaller_CallCachingResponses(t *testing.T) { var err error + srv := test.MockHTTPServer(nil) defer srv.Close() - srv.NextResponse <- ` - { - "jsonrpc": "2.0", - "result": { - "blocked": { - "channels": [], - "total": 0 - }, - "items": [ - { - "address": "bHz3LpVcuadmbK8g6VVUszF9jjH4pxG2Ct", - "amount": "0.5", - "canonical_url": "lbry://@lbry#3f/youtube-is-over-lbry-odysee-are-here#4" - } - ] - }, - "id": 0 - } - ` + + srv.NextResponse <- resolveResponseFree c := NewCaller(srv.URL, 0) - c.Cache, err = cache.New(cache.DefaultConfig()) + + cache, _, _, teardown := sturdycache.CreateTestCache(t) + defer teardown() + c.Cache = NewQueryCache(cache) + require.NoError(t, err) + + rpcReq := jsonrpc.NewRequest("resolve", map[string]any{"urls": "what"}) + rpcResponse, err := c.Call(bgctx(), rpcReq) require.NoError(t, err) - rpcResponse, err := c.Call(bgctx(), jsonrpc.NewRequest("claim_search", map[string]any{"urls": "what"})) + assert.Nil(t, rpcResponse.Error) + + expResponse, err := decodeResponse(resolveResponseFree) + require.NoError(t, err) + assert.EqualValues(t, expResponse.Result, rpcResponse.Result) + + srv.NextResponse <- resolveResponseCouldntFind + + rpcReq2 := jsonrpc.NewRequest("resolve", map[string]any{"urls": "one"}) + rpcResponse2, err := c.Call(bgctx(), rpcReq2) require.NoError(t, err) assert.Nil(t, rpcResponse.Error) - c.Cache.Wait() - cResp, err := c.Cache.Retrieve("claim_search", map[string]any{"urls": "what"}, nil) + + expResponse2, err := decodeResponse(resolveResponseCouldntFind) require.NoError(t, err) - assert.NotNil(t, cResp.(*jsonrpc.RPCResponse).Result) + assert.Nil(t, rpcResponse2.Error) + assert.EqualValues(t, expResponse2.Result, rpcResponse2.Result) + } func TestCaller_CallNotCachingErrors(t *testing.T) { @@ -430,16 +432,21 @@ func TestCaller_CallNotCachingErrors(t *testing.T) { }` c := NewCaller(srv.URL, 0) - c.Cache, err = cache.New(cache.DefaultConfig()) + cache, _, _, teardown := sturdycache.CreateTestCache(t) + defer teardown() + c.Cache = NewQueryCache(cache) require.NoError(t, err) - rpcResponse, err := c.Call(bgctx(), jsonrpc.NewRequest("claim_search", map[string]any{"urls": "what"})) + + rpcReq := jsonrpc.NewRequest("claim_search", map[string]any{"urls": "what"}) + rpcResponse, err := c.Call(bgctx(), rpcReq) require.NoError(t, err) assert.Equal(t, rpcResponse.Error.Code, -32000) time.Sleep(500 * time.Millisecond) - cResp, err := c.Cache.Retrieve( - "claim_search", - map[string]any{"urls": "what"}, - func() (any, error) { return nil, nil }) + + q, err := NewQuery(rpcReq, "") + require.NoError(t, err) + + cResp, err := c.Cache.Retrieve(q, nil) require.NoError(t, err) assert.Nil(t, cResp) } @@ -1012,9 +1019,10 @@ func TestCaller_JSONRPCNotCut(t *testing.T) { "id": 0 } ` - c := NewCaller(srv.URL, 0) - c.Cache, err = cache.New(cache.DefaultConfig()) + cache, _, _, teardown := sturdycache.CreateTestCache(t) + defer teardown() + c.Cache = NewQueryCache(cache) require.NoError(t, err) channelIds := []any{"1234", "4321", "5678", "8765", "9999", "0000", "1111"} @@ -1112,3 +1120,10 @@ func TestCaller_preflightHookClaimSearch(t *testing.T) { }) } } + +// func TestMain(m *testing.M) { +// var err error + +// code := m.Run() +// os.Exit(code) +// } diff --git a/app/query/metrics.go b/app/query/metrics.go new file mode 100644 index 00000000..659c8292 --- /dev/null +++ b/app/query/metrics.go @@ -0,0 +1,50 @@ +package query + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + CacheOperationGet = "get" + CacheOperationSet = "set" + + CacheResultHit = "hit" + CacheResultMiss = "miss" + CacheResultSuccess = "success" + CacheResultError = "error" +) + +var ( + queryRetrievalDurationBuckets = []float64{0.025, 0.05, 0.1, 0.25, 0.4, 1, 2.5, 5, 10, 25, 50, 100, 300} + cacheDurationBuckets = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0} + + QueryCacheOperationDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "query_cache", + Name: "operation_duration_seconds", + Help: "Cache operation latency", + Buckets: cacheDurationBuckets, + }, + []string{"operation", "result", "method"}, + ) + QueryRetrievalDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "query_cache", + Name: "retrieval_duration_seconds", + Help: "Latency for cold cache retrieval", + Buckets: queryRetrievalDurationBuckets, + }, + []string{"result", "method"}, + ) +) + +func ObserveQueryCacheOperation(operation, result, method string, start time.Time) { + QueryCacheOperationDuration.WithLabelValues(operation, result, method).Observe(float64(time.Since(start).Seconds())) +} + +func ObserveQueryRetrievalDuration(result, method string, start time.Time) { + QueryRetrievalDuration.WithLabelValues(result, method).Observe(float64(time.Since(start).Seconds())) +} diff --git a/app/query/middleware.go b/app/query/middleware.go new file mode 100644 index 00000000..7075349e --- /dev/null +++ b/app/query/middleware.go @@ -0,0 +1,34 @@ +package query + +import ( + "context" + "net/http" + + "github.com/gorilla/mux" +) + +type cacheKey struct{} + +func HasCache(r *http.Request) bool { + return r.Context().Value(cacheKey{}) != nil +} + +func CacheFromRequest(r *http.Request) *QueryCache { + v := r.Context().Value(cacheKey{}) + if v == nil { + panic("query.CacheMiddleware is required") + } + return v.(*QueryCache) +} + +func AddCacheToRequest(cache *QueryCache, fn http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + fn(w, r.Clone(context.WithValue(r.Context(), cacheKey{}, cache))) + } +} + +func CacheMiddleware(cache *QueryCache) mux.MiddlewareFunc { + return func(next http.Handler) http.Handler { + return AddCacheToRequest(cache, next.ServeHTTP) + } +} diff --git a/app/query/testing.go b/app/query/testing.go new file mode 100644 index 00000000..15f3933c --- /dev/null +++ b/app/query/testing.go @@ -0,0 +1,16 @@ +package query + +import ( + "encoding/json" + "strings" + + "github.com/ybbus/jsonrpc" +) + +func decodeResponse(r string) (*jsonrpc.RPCResponse, error) { + decoder := json.NewDecoder(strings.NewReader(r)) + decoder.DisallowUnknownFields() + decoder.UseNumber() + response := &jsonrpc.RPCResponse{} + return response, decoder.Decode(response) +} diff --git a/apps/lbrytv/config/config.go b/apps/lbrytv/config/config.go index f606c153..405f30ea 100644 --- a/apps/lbrytv/config/config.go +++ b/apps/lbrytv/config/config.go @@ -81,6 +81,18 @@ func GetAsynqueryRequestsConnOpts() (asynq.RedisConnOpt, error) { return asynq.ParseRedisURI(Config.Viper.GetString("AsynqueryRequestsConnURL")) } +func GetSturdyCacheMaster() string { + return Config.Viper.GetString("sturdycache.master") +} + +func GetSturdyCacheReplicas() []string { + return Config.Viper.GetStringSlice("sturdycache.replicas") +} + +func GetSturdyCachePassword() string { + return Config.Viper.GetString("sturdycache.password") +} + // GetDatabase returns postgresql database server connection config. func GetDatabase() cfg.DBConfig { return Config.GetDatabase() diff --git a/cmd/serve.go b/cmd/serve.go index 35c0d26a..0c8e0cd1 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -4,9 +4,7 @@ import ( "fmt" "io/ioutil" "log" - "math/rand" "os" - "time" "github.com/OdyseeTeam/odysee-api/api" "github.com/OdyseeTeam/odysee-api/app/sdkrouter" @@ -22,13 +20,12 @@ var rootCmd = &cobra.Command{ Use: "oapi", Short: "backend server for Odysee frontend", Run: func(_ *cobra.Command, _ []string) { - rand.Seed(time.Now().UnixNano()) // always seed random! sdkRouter := sdkrouter.New(config.GetLbrynetServers()) go sdkRouter.WatchLoad() s := server.NewServer(config.GetAddress(), sdkRouter, &api.RoutesOptions{ EnableProfiling: config.GetProfiling(), - EnableV3Publish: true, + EnableV3Publish: false, }) err := s.Start() if err != nil { diff --git a/docker-compose.app.yml b/docker-compose.app.yml index b8bfafb4..4b7408a5 100644 --- a/docker-compose.app.yml +++ b/docker-compose.app.yml @@ -1,5 +1,3 @@ -version: "3.2" - services: oapi: image: odyseeteam/odysee-api:latest @@ -15,3 +13,6 @@ services: - postgres labels: com.centurylinklabs.watchtower.enable: true + +volumes: + storage: {} diff --git a/docker/launcher.sh b/docker/launcher.sh index c7551219..8a137568 100755 --- a/docker/launcher.sh +++ b/docker/launcher.sh @@ -1,5 +1,9 @@ #!/bin/sh -ssh-keygen -t rsa -f token_privkey.rsa -m pem +file="token_privkey.rsa" +if [ ! -f "$file" ]; then + ssh-keygen -t rsa -f "$file" -m pem +fi + ./oapi db_migrate_up ./oapi diff --git a/docker/oapi.yml b/docker/oapi.yml index 41120761..0bbea39f 100644 --- a/docker/oapi.yml +++ b/docker/oapi.yml @@ -25,9 +25,9 @@ StreamsV6: InternalAPIHost: https://api.lbry.com ProjectURL: https://lbry.tv -DatabaseDSN: postgres://lbrytv:lbrytv@postgres +DatabaseDSN: postgres://postgres:odyseeteam@postgres Database: - DBName: lbrytv + DBName: postgres Options: sslmode=disable OAuth: @@ -56,12 +56,18 @@ RPCTimeouts: transaction_list: 4m publish: 4m -RedisLocker: redis://:odyredis@localhost:6379/1 -RedisBus: redis://:odyredis@localhost:6379/2 +RedisLocker: redis://:odyredis@redis:6379/1 +RedisBus: redis://:odyredis@redis:6379/2 # AsynqueryRequestsConnURL is Redis database where asynquery will be listening for finalized uploads requests. # This corresponds to AsynqueryRequestsConnURL in forklift.yml config. -AsynqueryRequestsConnURL: redis://:odyredis@localhost:6379/3 +AsynqueryRequestsConnURL: redis://:odyredis@redis:6379/3 + +SturdyCache: + Master: redis:6379 + Replicas: + - redis:6379 + Password: odyredis ReflectorUpstream: DatabaseDSN: 'user:password@tcp(localhost:3306)/blobs' diff --git a/go.mod b/go.mod index 8eaf081d..52690c0a 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,9 @@ require ( github.com/bluele/factory-go v0.0.1 github.com/btcsuite/btcd v0.22.0-beta github.com/coreos/go-oidc v2.2.1+incompatible - github.com/dgraph-io/ristretto v0.1.0 + github.com/dgraph-io/ristretto v0.2.0 + github.com/eko/gocache/lib/v4 v4.1.6 + github.com/eko/gocache/store/redis/v4 v4.2.2 github.com/getsentry/sentry-go v0.13.0 github.com/go-chi/chi/v5 v5.0.8 github.com/go-chi/cors v1.2.1 @@ -60,7 +62,7 @@ require ( goa.design/goa/v3 v3.5.2 goa.design/plugins/v3 v3.4.3 golang.org/x/oauth2 v0.6.0 - golang.org/x/sync v0.1.0 + golang.org/x/sync v0.9.0 gopkg.in/vansante/go-ffprobe.v2 v2.1.0 gopkg.in/yaml.v2 v2.4.0 logur.dev/logur v0.17.0 @@ -68,10 +70,16 @@ require ( require ( dario.cat/mergo v1.0.1 // indirect + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/golang/mock v1.6.0 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/moby/sys/user v0.3.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect ) require ( @@ -79,6 +87,7 @@ require ( github.com/Microsoft/go-winio v0.6.0 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect github.com/ajg/form v1.5.1 // indirect + github.com/alicebob/miniredis/v2 v2.33.0 github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect @@ -103,7 +112,7 @@ require ( github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect github.com/bytedance/sonic v1.9.1 // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect github.com/cockroachdb/errors v1.9.0 // indirect @@ -119,7 +128,7 @@ require ( github.com/docker/docker v25.0.6+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/fatih/structs v1.1.0 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect @@ -138,7 +147,6 @@ require ( github.com/gofrs/uuid v4.0.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect - github.com/golang/glog v1.0.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/gomodule/redigo v1.8.8 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect @@ -204,12 +212,12 @@ require ( go.uber.org/multierr v1.8.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.23.0 // indirect - golang.org/x/mod v0.9.0 // indirect + golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.15.0 // indirect golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.7.0 // indirect + golang.org/x/tools v0.20.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect diff --git a/go.sum b/go.sum index c7e3c00c..f0428ee8 100644 --- a/go.sum +++ b/go.sum @@ -588,6 +588,10 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA= +github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= github.com/anbsky/lbry.go/v3 v3.0.6 h1:N7JwcSqWpQdO6nBfo8J6JVd/w2RMlhlKM8My6qDwzuA= github.com/anbsky/lbry.go/v3 v3.0.6/go.mod h1:WakZg1CiiOhsinlnSA5jNVLWSllYodx+gsBqExtJBv8= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= @@ -714,8 +718,9 @@ github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= @@ -798,11 +803,12 @@ github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0 github.com/decred/dcrd/lru v1.1.1/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/denisenkom/go-mssqldb v0.9.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI= -github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= +github.com/dgraph-io/ristretto v0.2.0 h1:XAfl+7cmoUDWW/2Lx8TGZQjjxIQ2Ley9DSf52dru4WE= +github.com/dgraph-io/ristretto v0.2.0/go.mod h1:8uBHCU/PBV4Ag0CJrP47b9Ofby5dqWNh4FicAdoqFNU= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= @@ -822,13 +828,18 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= +github.com/eko/gocache/lib/v4 v4.1.6 h1:5WWIGISKhE7mfkyF+SJyWwqa4Dp2mkdX8QsZpnENqJI= +github.com/eko/gocache/lib/v4 v4.1.6/go.mod h1:HFxC8IiG2WeRotg09xEnPD72sCheJiTSr4Li5Ameg7g= +github.com/eko/gocache/store/redis/v4 v4.2.2 h1:Thw31fzGuH3WzJywsdbMivOmP550D6JS7GDHhvCJPA0= +github.com/eko/gocache/store/redis/v4 v4.2.2/go.mod h1:LaTxLKx9TG/YUEybQvPMij++D7PBTIJ4+pzvk0ykz0w= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -1095,7 +1106,6 @@ github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzq github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -1887,6 +1897,9 @@ github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= github.com/vimeo/go-util v1.4.1/go.mod h1:r+yspV//C48HeMXV8nEvtUeNiIiGfVv3bbEHzOgudwE= github.com/vmihailenco/msgpack/v5 v5.3.2/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/volatiletech/inflect v0.0.0-20170731032912-e7201282ae8d/go.mod h1:jspfvgf53t5NLUT4o9L1IX0kIBNKamGq1tWc/MgWK9Q= github.com/volatiletech/inflect v0.0.1 h1:2a6FcMQyhmPZcLa+uet3VJ8gLn/9svWhJxJYwvE8KsU= @@ -1921,6 +1934,8 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zach-klippenstein/goregen v0.0.0-20160303162051-795b5e3961ea h1:CyhwejzVGvZ3Q2PSbQ4NRRYn+ZWv5eS1vlaEusT+bAI= github.com/zach-klippenstein/goregen v0.0.0-20160303162051-795b5e3961ea/go.mod h1:eNr558nEUjP8acGw8FFjTeWvSgU1stO7FAO6eknhHe4= @@ -2043,6 +2058,8 @@ golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8H golang.org/x/exp v0.0.0-20211123021643-48cbe7f80d7c/go.mod h1:b9TAUYHmRtqA6klRHApnXMnj+OyLce4yF5cZCUbk2ps= golang.org/x/exp v0.0.0-20220518171630-0b5c67f07fdf/go.mod h1:yh0Ynu2b5ZUe3MQfp2nM0ecK7wsgouWTDN0FNeJuIys= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -2088,8 +2105,8 @@ golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2 golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= -golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -2221,8 +2238,9 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2477,8 +2495,8 @@ golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= -golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= +golang.org/x/tools v0.20.0 h1:hz/CVckiOxybQvFw6h7b/q80NTr9IUQb4s1IIzW7KNY= +golang.org/x/tools v0.20.0/go.mod h1:WvitBU7JJf6A4jOdg4S1tviW9bhUxkgeCui/0JHctQg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 7f7cac2c..6c7596ae 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -7,17 +7,18 @@ import ( ) const ( - nsPlayer = "player" - nsAPI = "api" - nsIAPI = "iapi" - nsAuth = "auth" - nsProxy = "proxy" - nsLbrynext = "lbrynext" - nsLbrynet = "lbrynet" - nsUI = "ui" - nsLbrytv = "lbrytv" - nsOperations = "op" - nsPublish = "publish" + nsPlayer = "player" + nsAPI = "api" + nsIAPI = "iapi" + nsAuth = "auth" + nsProxy = "proxy" + nsSturdyCache = "sturdycache" + nsLbrynext = "lbrynext" + nsLbrynet = "lbrynet" + nsUI = "ui" + nsLbrytv = "lbrytv" + nsOperations = "op" + nsPublish = "publish" LabelSource = "source" LabelInstance = "instance" @@ -46,7 +47,7 @@ const ( ) var ( - callsSecondsBuckets = []float64{0.005, 0.025, 0.05, 0.1, 0.25, 0.4, 1, 2, 5, 10, 20, 60, 120, 300} + callDurationBuckets = []float64{0.005, 0.025, 0.05, 0.1, 0.25, 0.4, 1, 2, 5, 10, 20, 60, 120, 300} IAPIAuthSuccessDurations = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: nsIAPI, @@ -84,7 +85,7 @@ var ( Subsystem: "e2e_calls", Name: "total_seconds", Help: "End-to-end method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method"}, ) @@ -94,7 +95,7 @@ var ( Subsystem: "e2e_calls", Name: "failed_seconds", Help: "Failed end-to-end method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "kind"}, ) @@ -123,7 +124,7 @@ var ( Subsystem: "calls", Name: "total_seconds", Help: "Method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "endpoint", "origin"}, ) @@ -133,7 +134,7 @@ var ( Subsystem: "calls", Name: "failed_seconds", Help: "Failed method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "endpoint", "origin", "kind"}, ) @@ -202,7 +203,7 @@ var ( Subsystem: "calls", Name: "total_seconds", Help: "How long do calls to lbrytv take (end-to-end)", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"path"}, ) @@ -258,7 +259,7 @@ var ( Subsystem: "calls", Name: "total_seconds", Help: "Method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "endpoint", "group"}, ) @@ -268,7 +269,7 @@ var ( Subsystem: "calls", Name: "failed_seconds", Help: "Failed method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "endpoint", "group", "kind"}, ) diff --git a/internal/status/status.go b/internal/status/status.go index 1b67f859..514ef254 100644 --- a/internal/status/status.go +++ b/internal/status/status.go @@ -9,7 +9,6 @@ import ( "github.com/OdyseeTeam/odysee-api/app/auth" "github.com/OdyseeTeam/odysee-api/app/query" - "github.com/OdyseeTeam/odysee-api/app/query/cache" "github.com/OdyseeTeam/odysee-api/app/sdkrouter" "github.com/OdyseeTeam/odysee-api/app/wallet" "github.com/OdyseeTeam/odysee-api/apps/lbrytv/config" @@ -17,6 +16,7 @@ import ( "github.com/OdyseeTeam/odysee-api/internal/monitor" "github.com/OdyseeTeam/odysee-api/internal/responses" "github.com/OdyseeTeam/odysee-api/models" + "github.com/gorilla/mux" "github.com/ybbus/jsonrpc" ) @@ -92,7 +92,6 @@ func StatusV2(w http.ResponseWriter, r *http.Request) { var ( userID int - qCache *cache.Cache lbrynetServer *models.LbrynetServer ) rt := sdkrouter.New(config.GetLbrynetServers()) @@ -106,24 +105,23 @@ func StatusV2(w http.ResponseWriter, r *http.Request) { srv := serverItem{Name: lbrynetServer.Name, Status: statusOK} - if cache.IsOnRequest(r) { - qCache = cache.FromRequest(r) + c := query.NewCaller(lbrynetServer.Address, userID) + if query.HasCache(r) { + c.Cache = query.CacheFromRequest(r) } - c := query.NewCaller(lbrynetServer.Address, userID) - c.Cache = qCache - rpcRes, err := c.Call(r.Context(), jsonrpc.NewRequest("resolve", map[string]interface{}{"urls": resolveURL})) + rpcRes, err := c.Call(r.Context(), jsonrpc.NewRequest(query.MethodResolve, map[string]interface{}{"urls": resolveURL})) if err != nil { srv.Error = err.Error() srv.Status = statusOffline failureDetected = true - logger.Log().Error("we're failing: ", err) + logger.Log().Errorf("status call resolve is failing: %s", err) } else if rpcRes.Error != nil { srv.Error = rpcRes.Error.Message srv.Status = statusNotReady failureDetected = true - logger.Log().Error("we're failing: ", err) + logger.Log().Errorf("status call resolve is failing: %s", err) } else { if user != nil { response.User = &userData{ diff --git a/internal/test/test.go b/internal/test/test.go index 782b78ff..c49a65d7 100644 --- a/internal/test/test.go +++ b/internal/test/test.go @@ -3,7 +3,7 @@ package test import ( "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "net/http/httptest" "testing" @@ -45,7 +45,7 @@ func MockHTTPServer(requestChan chan *Request) *mockServer { Server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() if requestChan != nil { - data, _ := ioutil.ReadAll(r.Body) + data, _ := io.ReadAll(r.Body) requestChan <- &Request{r, w, string(data)} } fmt.Fprintf(w, <-next) diff --git a/internal/test/test_test.go b/internal/test/test_test.go index 400542aa..eb4a2486 100644 --- a/internal/test/test_test.go +++ b/internal/test/test_test.go @@ -2,7 +2,7 @@ package test import ( "bytes" - "io/ioutil" + "io" "net/http" "testing" @@ -39,7 +39,7 @@ func TestMockHTTPServer(t *testing.T) { req2 := <-reqChan assert.Equal(t, req2.R.Method, http.MethodPost) assert.Equal(t, req2.Body, `hello`) - body, err := ioutil.ReadAll(res2.Body) + body, err := io.ReadAll(res2.Body) require.NoError(t, err) assert.Equal(t, string(body), "ok") } diff --git a/oapi.yml b/oapi.yml index b8f48dac..540e2605 100644 --- a/oapi.yml +++ b/oapi.yml @@ -65,6 +65,12 @@ RedisBus: redis://:odyredis@localhost:6379/2 # This corresponds to AsynqueryRequestsConnURL in forklift.yml config. AsynqueryRequestsConnURL: redis://:odyredis@localhost:6379/3 +SturdyCache: + Master: localhost:6379 + Replicas: + - localhost:6379 + Password: odyredis + ReflectorUpstream: DatabaseDSN: 'user:password@tcp(localhost:3306)/blobs' Region: us-east-1 diff --git a/pkg/sturdycache/sturdycache.go b/pkg/sturdycache/sturdycache.go new file mode 100644 index 00000000..8c755ecb --- /dev/null +++ b/pkg/sturdycache/sturdycache.go @@ -0,0 +1,94 @@ +package sturdycache + +import ( + "context" + + "github.com/eko/gocache/lib/v4/cache" + "github.com/eko/gocache/lib/v4/store" + redis_store "github.com/eko/gocache/store/redis/v4" + "github.com/redis/go-redis/v9" +) + +type ReplicatedCache struct { + masterCache *cache.Cache[any] + replicaCaches []*cache.Cache[any] +} + +// NewReplicatedCache creates a new gocache store instance for redis master-replica setups. +// Requires one master server address and one or more replica addresses. +func NewReplicatedCache( + masterAddr string, + replicaAddrs []string, + password string, +) (*ReplicatedCache, error) { + masterClient := redis.NewClient(&redis.Options{ + Addr: masterAddr, + Password: password, + DB: 0, + PoolSize: 200, + MinIdleConns: 10, + }) + + masterStore := redis_store.NewRedis(masterClient) + masterCache := cache.New[any](masterStore) + + replicaCaches := make([]*cache.Cache[any], len(replicaAddrs)) + + for i, addr := range replicaAddrs { + replicaClient := redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, + DB: 0, + // PoolSize: 10, + // MinIdleConns: 5, + }) + + replicaStore := redis_store.NewRedis(replicaClient) + replicaCaches[i] = cache.New[any](replicaStore) + } + + cache := &ReplicatedCache{ + masterCache: masterCache, + replicaCaches: replicaCaches, + } + + return cache, nil +} + +// Set writes to master. +func (rc *ReplicatedCache) Set(ctx context.Context, key any, value any, options ...store.Option) error { + return rc.masterCache.Set(ctx, key, value, options...) +} + +// Get tries replicas first, falls back to master. +func (rc *ReplicatedCache) Get(ctx context.Context, key any) (any, error) { + for _, replica := range rc.replicaCaches { + value, err := replica.Get(ctx, key) + if err == nil { + return value, nil + } + } + + // Fallback to master + return rc.masterCache.Get(ctx, key) +} + +// Invalidate master cache entries for given options. +func (rc *ReplicatedCache) Invalidate(ctx context.Context, options ...store.InvalidateOption) error { + return rc.masterCache.Invalidate(ctx, options...) +} + +// Delete from master cache. +func (rc *ReplicatedCache) Delete(ctx context.Context, key any) error { + return rc.masterCache.Delete(ctx, key) +} + +// Clear master cache. +func (rc *ReplicatedCache) Clear(ctx context.Context) error { + return rc.masterCache.Clear(ctx) +} + +// GetType returns cache type name. +func (rc *ReplicatedCache) GetType() string { + return "replicated_redis" +} diff --git a/pkg/sturdycache/sturdycache_test.go b/pkg/sturdycache/sturdycache_test.go new file mode 100644 index 00000000..eeeaccbe --- /dev/null +++ b/pkg/sturdycache/sturdycache_test.go @@ -0,0 +1,181 @@ +package sturdycache + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/eko/gocache/lib/v4/store" + "github.com/stretchr/testify/suite" +) + +type ReplicatedCacheTestSuite struct { + suite.Suite + + master *miniredis.Miniredis + replicas []*miniredis.Miniredis + cache *ReplicatedCache + teardownFunc teardownFunc + ctx context.Context + cancel context.CancelFunc +} + +type TestStruct struct { + ID int `json:"id"` + Name string `json:"name"` +} + +func (t TestStruct) MarshalBinary() ([]byte, error) { + return json.Marshal(t) +} + +func (t *TestStruct) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, t) +} + +func (s *ReplicatedCacheTestSuite) SetupTest() { + s.cache, s.master, s.replicas, s.teardownFunc = CreateTestCache(s.T()) +} + +func (s *ReplicatedCacheTestSuite) TearDownTest() { + s.teardownFunc() +} +func (s *ReplicatedCacheTestSuite) SetupSuite() { + s.ctx, s.cancel = context.WithTimeout(context.Background(), 30*time.Second) +} + +func (s *ReplicatedCacheTestSuite) TearDownSuite() { + s.cancel() +} + +func (s *ReplicatedCacheTestSuite) TestNewReplicatedCache() { + s.Require().NotNil(s.cache) + s.Require().NotNil(s.cache.masterCache) + s.Require().Len(s.cache.replicaCaches, len(s.replicas)) +} + +func (s *ReplicatedCacheTestSuite) TestSet() { + err := s.cache.Set(s.ctx, "key1", "value1") + s.Require().NoError(err) + + val, err := s.master.Get("key1") + s.Require().NoError(err) + s.Require().Contains(val, "value1") + + err = s.cache.Set(s.ctx, "key2", "value2", store.WithExpiration(time.Minute)) + s.Require().NoError(err) + + ttl := s.master.TTL("key2") + s.Require().True(ttl > 0) +} + +func (s *ReplicatedCacheTestSuite) TestGet() { + testKey := "test-key" + testValue := "test-value" + + err := s.cache.Set(s.ctx, testKey, testValue) + s.Require().NoError(err) + + masterValue, err := s.master.Get(testKey) + s.Require().NoError(err) + + for _, r := range s.replicas { + r.Set(testKey, masterValue) + } + + value, err := s.cache.Get(s.ctx, testKey) + s.Require().NoError(err) + s.Require().Equal(testValue, value) +} + +func (s *ReplicatedCacheTestSuite) TestGetWithReplicaFailures() { + testKey := "test-key" + testValue := "test-value" + + err := s.cache.Set(s.ctx, testKey, testValue) + s.Require().NoError(err) + + // Manually replicate to replicas + masterValue, err := s.master.Get(testKey) + s.Require().NoError(err) + + for _, r := range s.replicas { + r.Set(testKey, masterValue) + } + + // Test scenario: replicas fail one by one + for i, r := range s.replicas { + s.T().Logf("Testing with replica %d down", i) + + r.Close() + + value, err := s.cache.Get(s.ctx, testKey) + s.Require().NoError(err) + s.Require().Equal(testValue, value) + } + + // Test with all replicas down + value, err := s.cache.Get(s.ctx, testKey) + s.Require().NoError(err) + s.Require().Equal(testValue, value, "Should get value from master when all replicas are down") +} + +func (s *ReplicatedCacheTestSuite) TestClear() { + for i := range 3 { + key := fmt.Sprintf("key-%d", i) + err := s.cache.Set(s.ctx, key, fmt.Sprintf("value-%d", i)) + s.Require().NoError(err) + } + + s.Require().Greater(len(s.master.Keys()), 0) + + err := s.cache.Clear(s.ctx) + s.Require().NoError(err) + + s.Require().Equal(0, len(s.master.Keys())) +} + +func (s *ReplicatedCacheTestSuite) TestInvalidate() { + for i := range 5 { + key := fmt.Sprintf("key-%d", i) + err := s.cache.Set(s.ctx, key, fmt.Sprintf("value-%d", i), store.WithTags([]string{fmt.Sprintf("tag-%d", i)})) + s.Require().NoError(err) + } + + s.Require().Greater(len(s.master.Keys()), 0) + + err := s.cache.Invalidate(s.ctx, store.WithInvalidateTags([]string{"tag-1", "tag-2"})) + s.Require().NoError(err) + + _, err = s.cache.Get(s.ctx, "key-1") + s.Require().True(errors.Is(err, &store.NotFound{})) + _, err = s.cache.Get(s.ctx, "key-2") + s.Require().True(errors.Is(err, &store.NotFound{})) +} +func (s *ReplicatedCacheTestSuite) TestGetNonExistentKey() { + _, err := s.cache.Get(s.ctx, "non-existent-key") + s.Require().True(errors.Is(err, &store.NotFound{})) +} + +func (s *ReplicatedCacheTestSuite) TestSetStructValue() { + + testValue := TestStruct{ + ID: 1, + Name: "test", + } + + err := s.cache.Set(s.ctx, "struct-key", testValue) + s.Require().NoError(err) + + value, err := s.cache.Get(s.ctx, "struct-key") + s.Require().NoError(err) + s.Require().NotNil(value) +} + +func TestReplicatedCacheTestSuite(t *testing.T) { + suite.Run(t, new(ReplicatedCacheTestSuite)) +} diff --git a/pkg/sturdycache/testing.go b/pkg/sturdycache/testing.go new file mode 100644 index 00000000..6285143e --- /dev/null +++ b/pkg/sturdycache/testing.go @@ -0,0 +1,38 @@ +package sturdycache + +import ( + "testing" + + "github.com/alicebob/miniredis/v2" + "github.com/stretchr/testify/require" +) + +type teardownFunc func() + +func CreateTestCache(t *testing.T) (*ReplicatedCache, *miniredis.Miniredis, []*miniredis.Miniredis, teardownFunc) { + require := require.New(t) + master := miniredis.RunT(t) + + replicas := make([]*miniredis.Miniredis, 3) + for i := range 3 { + replicas[i] = miniredis.RunT(t) + } + + replicaAddrs := make([]string, len(replicas)) + for i, r := range replicas { + replicaAddrs[i] = r.Addr() + } + + cache, err := NewReplicatedCache( + master.Addr(), + replicaAddrs, + "", + ) + require.NoError(err) + return cache, master, replicas, func() { + master.Close() + for _, r := range replicas { + r.Close() + } + } +} diff --git a/readme.md b/readme.md index 823a76ed..72b891f3 100644 --- a/readme.md +++ b/readme.md @@ -50,7 +50,7 @@ Make sure you have recent enough Docker and `docker compose` installed. This will pull and launch SDK and postgres images, which Odysee API requires to operate. -`docker compose -f docker-compose.yml -f docker compose.app.yml up -d` +`docker compose -f docker-compose.yml -f docker-compose.app.yml up -d` *Note: if you're running a LBRY desktop app or lbrynet instance, you will have to either shut it down or change ports*