diff --git a/api/routes.go b/api/routes.go index 585a823f..485e9a76 100644 --- a/api/routes.go +++ b/api/routes.go @@ -69,6 +69,8 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio legacyProvider := auth.NewIAPIProvider(sdkRouter, config.GetInternalAPIHost()) sentryHandler := sentryhttp.New(sentryhttp.Options{}) + allMiddlewares := defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter) + r.Use(methodTimer, sentryHandler.Handle) r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { @@ -77,7 +79,7 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio r.HandleFunc("", emptyHandler) v1Router := r.PathPrefix("/api/v1").Subrouter() - v1Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter)) + v1Router.Use(allMiddlewares) v1Router.HandleFunc("/proxy", upHandler.Handle).MatcherFunc(publish.CanHandle) v1Router.HandleFunc("/proxy", proxy.Handle).Methods(http.MethodPost) @@ -108,7 +110,7 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio } v2Router := r.PathPrefix("/api/v2").Subrouter() - v2Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter)) + v2Router.Use(allMiddlewares) status.InstallRoutes(v2Router) composer := tusd.NewStoreComposer() @@ -153,7 +155,7 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio gpl := zapadapter.NewNamedKV("geopublish", config.GetLoggingOpts()) if opts.EnableV3Publish { v3Router := r.PathPrefix("/api/v3").Subrouter() - v3Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter)) + v3Router.Use(allMiddlewares) ug := auth.NewUniversalUserGetter(oauthAuther, legacyProvider, gpl) gPath := config.GetGeoPublishSourceDir() v3Handler, err = geopublish.InstallRoutes(v3Router.PathPrefix("/publish").Subrouter(), ug, gPath, "/api/v3/publish/", gpl) @@ -208,7 +210,15 @@ func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Prov if err != nil { panic(err) } - cache := query.NewQueryCache(store) + lstore, err := sturdycache.AddLocalCache(store) + if err != nil { + panic(err) + } + + cache, err := query.NewQueryCacheWithInvalidator(lstore) + if err != nil { + panic(err) + } logger.Log().Infof("cache configured: master=%s, replicas=%s", config.GetSturdyCacheMaster(), config.GetSturdyCacheReplicas()) defaultHeaders := []string{ diff --git a/app/query/cache.go b/app/query/cache.go index 1606c9ec..53ea47a0 100644 --- a/app/query/cache.go +++ b/app/query/cache.go @@ -9,6 +9,7 @@ import ( "time" "github.com/OdyseeTeam/odysee-api/internal/monitor" + "github.com/OdyseeTeam/odysee-api/pkg/chainquery" "github.com/OdyseeTeam/odysee-api/pkg/rpcerrors" "github.com/eko/gocache/lib/v4/cache" @@ -18,6 +19,11 @@ import ( "golang.org/x/sync/singleflight" ) +const ( + methodTagSeparator = ":" + invalidationInterval = 15 * time.Second +) + type CacheRequest struct { Method string Params any @@ -31,14 +37,40 @@ type CachedResponse struct { type QueryCache struct { cache *marshaler.Marshaler singleflight *singleflight.Group + height int + stopChan chan struct{} } -func NewQueryCache(store cache.CacheInterface[any]) *QueryCache { - marshal := marshaler.New(store) +func NewQueryCache(baseCache cache.CacheInterface[any]) *QueryCache { + marshal := marshaler.New(baseCache) return &QueryCache{ cache: marshal, singleflight: &singleflight.Group{}, + stopChan: make(chan struct{}), + } +} + +func NewQueryCacheWithInvalidator(baseCache cache.CacheInterface[any]) (*QueryCache, error) { + qc := NewQueryCache(baseCache) + height, err := chainquery.GetHeight() + if err != nil { + QueryCacheErrorCount.WithLabelValues(CacheAreaChainquery).Inc() + return nil, fmt.Errorf("failed to get current height: %w", err) } + qc.height = height + go func() { + ticker := time.NewTicker(invalidationInterval) + for { + select { + case <-ticker.C: + qc.runInvalidator() + case <-qc.stopChan: + return + } + } + }() + + return qc, nil } func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*CachedResponse, error) { @@ -72,10 +104,10 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached start := time.Now() obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter) if err != nil { - ObserveQueryRetrievalDuration(CacheResultError, cacheReq.Method, start) + ObserveQueryCacheRetrievalDuration(CacheResultError, cacheReq.Method, start) return nil, fmt.Errorf("error calling getter: %w", err) } - ObserveQueryRetrievalDuration(CacheResultSuccess, cacheReq.Method, start) + ObserveQueryCacheRetrievalDuration(CacheResultSuccess, cacheReq.Method, start) res, ok := obj.(*jsonrpc.RPCResponse) if !ok { @@ -94,7 +126,7 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached ) if err != nil { ObserveQueryCacheOperation(CacheOperationSet, CacheResultError, cacheReq.Method, start) - monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{"method": cacheReq.Method}) + monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{ParamMethod: cacheReq.Method}) log.Warnf("error during cache.set: %s", err) return cacheResp, nil } @@ -103,6 +135,10 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached return cacheResp, nil } + if hit == nil { + ObserveQueryCacheOperation(CacheOperationGet, CacheResultError, cacheReq.Method, start) + return nil, nil + } log.Infof("cache hit for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds()) ObserveQueryCacheOperation(CacheOperationGet, CacheResultHit, cacheReq.Method, start) cacheResp, ok := hit.(*CachedResponse) @@ -112,19 +148,48 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached return cacheResp, nil } +func (c *QueryCache) runInvalidator() error { + log := logger.Log() + height, err := chainquery.GetHeight() + if err != nil { + QueryCacheErrorCount.WithLabelValues(CacheAreaChainquery).Inc() + return fmt.Errorf("failed to get current height: %w", err) + } + if c.height >= height { + log.Infof("block height unchanged (%v = %v), cache invalidation skipped", height, c.height) + return nil + } + + log.Infof("new block height (%v > %v), running invalidation", height, c.height) + c.height = height + + ctx, cancel := context.WithTimeout(context.Background(), invalidationInterval) + defer cancel() + err = c.cache.Invalidate(ctx, store.WithInvalidateTags( + []string{fmt.Sprintf("%s%s%s", ParamMethod, methodTagSeparator, MethodClaimSearch)}, + )) + if err != nil { + QueryCacheErrorCount.WithLabelValues(CacheAreaInvalidateCall).Inc() + log.Warnf("failed to invalidate %s entries: %s", MethodClaimSearch, err) + return fmt.Errorf("failed to invalidate %s entries: %w", MethodClaimSearch, err) + } + + return nil +} + func (r CacheRequest) Expiration() time.Duration { switch r.Method { case MethodResolve: return 600 * time.Second case MethodClaimSearch: - return 180 * time.Second + return 300 * time.Second default: return 60 * time.Second } } func (r CacheRequest) Tags() []string { - return []string{"method:" + r.Method} + return []string{fmt.Sprintf("%s%s%s", ParamMethod, methodTagSeparator, r.Method)} } func (r CacheRequest) GetCacheKey() string { @@ -135,7 +200,7 @@ func (r CacheRequest) GetCacheKey() string { params = "()" } else { if p, err := json.Marshal(r.Params); err != nil { - params = "(x)" + params = "(error)" } else { params = string(p) } diff --git a/app/query/cache_test.go b/app/query/cache_test.go index d1621692..b3fc0f29 100644 --- a/app/query/cache_test.go +++ b/app/query/cache_test.go @@ -12,7 +12,6 @@ import ( func TestGetCacheKey(t *testing.T) { assert := assert.New(t) require := require.New(t) - assert.Equal(1, 1) seen := map[string]bool{} params := []map[string]any{{}, {"uri": "what"}, {"uri": "odysee"}, nil} genCacheKey := func(params map[string]any) string { diff --git a/app/query/caller.go b/app/query/caller.go index 90380bfb..0f9e0fd1 100644 --- a/app/query/caller.go +++ b/app/query/caller.go @@ -196,7 +196,7 @@ func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP return nil, rpcerrors.NewSDKError(err) } if res != nil { - logger.Log().Infof("got %s response from %s hook", q.Method(), hook.name) + logger.Log().Debugf("got %s response from %s hook", q.Method(), hook.name) return res, nil } } diff --git a/app/query/const.go b/app/query/const.go index bf77cdb2..d78e512e 100644 --- a/app/query/const.go +++ b/app/query/const.go @@ -4,21 +4,78 @@ const ( cacheResolveLongerThan = 10 maxListSizeLogged = 5 - MethodAccountList = "account_list" - MethodClaimSearch = "claim_search" - MethodCommentReactList = "comment_react_list" - MethodFileList = "file_list" - MethodGet = "get" - MethodPublish = "publish" - MethodPurchaseCreate = "purchase_create" - MethodResolve = "resolve" - MethodStatus = "status" - MethodStreamUpdate = "stream_update" - MethodStreamCreate = "stream_create" - MethodSyncApply = "sync_apply" - MethodWalletBalance = "wallet_balance" - MethodWalletSend = "wallet_send" - + // Method constants + MethodAccountList = "account_list" + MethodClaimSearch = "claim_search" + MethodCommentReactList = "comment_react_list" + MethodFileList = "file_list" + MethodGet = "get" + MethodPublish = "publish" + MethodPurchaseCreate = "purchase_create" + MethodResolve = "resolve" + MethodStatus = "status" + MethodStreamUpdate = "stream_update" + MethodStreamCreate = "stream_create" + MethodSyncApply = "sync_apply" + MethodWalletBalance = "wallet_balance" + MethodWalletSend = "wallet_send" + MethodBlobAnnounce = "blob_announce" + MethodTransactionShow = "transaction_show" + MethodStreamCostEstimate = "stream_cost_estimate" + MethodCommentList = "comment_list" + MethodCollectionResolve = "collection_resolve" + MethodVersion = "version" + MethodRoutingTableGet = "routing_table_get" + MethodAddressUnused = "address_unused" + MethodAddressList = "address_list" + MethodAddressIsMine = "address_is_mine" + MethodAccountBalance = "account_balance" + MethodAccountSend = "account_send" + MethodAccountMaxAddressGap = "account_max_address_gap" + MethodChannelAbandon = "channel_abandon" + MethodChannelCreate = "channel_create" + MethodChannelList = "channel_list" + MethodChannelUpdate = "channel_update" + MethodChannelExport = "channel_export" + MethodChannelImport = "channel_import" + MethodChannelSign = "channel_sign" + MethodCollectionList = "collection_list" + MethodCollectionCreate = "collection_create" + MethodCollectionUpdate = "collection_update" + MethodCollectionAbandon = "collection_abandon" + MethodCommentAbandon = "comment_abandon" + MethodCommentCreate = "comment_create" + MethodCommentHide = "comment_hide" + MethodCommentUpdate = "comment_update" + MethodCommentReact = "comment_react" + MethodCommentPin = "comment_pin" + MethodClaimList = "claim_list" + MethodStreamAbandon = "stream_abandon" + MethodStreamRepost = "stream_repost" + MethodStreamList = "stream_list" + MethodSupportAbandon = "support_abandon" + MethodSupportCreate = "support_create" + MethodSupportList = "support_list" + MethodSyncHash = "sync_hash" + MethodPreferenceGet = "preference_get" + MethodPreferenceSet = "preference_set" + MethodPurchaseList = "purchase_list" + MethodTransactionList = "transaction_list" + MethodTxoList = "txo_list" + MethodTxoSpend = "txo_spend" + MethodTxoSum = "txo_sum" + MethodTxoPlot = "txo_plot" + MethodUtxoList = "utxo_list" + MethodUtxoRelease = "utxo_release" + MethodWalletList = "wallet_list" + MethodWalletEncrypt = "wallet_encrypt" + MethodWalletDecrypt = "wallet_decrypt" + MethodWalletLock = "wallet_lock" + MethodWalletUnlock = "wallet_unlock" + MethodWalletStatus = "wallet_status" + + // Parameter constants + ParamMethod = "method" ParamAccountID = "account_id" ParamChannelID = "channel_id" ParamNewSDKServer = "new_sdk_server" @@ -31,101 +88,79 @@ const ( var forbiddenParams = []string{ParamAccountID, ParamNewSDKServer} -// relaxedMethods are methods which are allowed to be called without wallet_id. var relaxedMethods = []string{ - "blob_announce", + MethodBlobAnnounce, MethodStatus, MethodResolve, MethodGet, - "transaction_show", - "stream_cost_estimate", - "claim_search", - "comment_list", - "collection_resolve", + MethodTransactionShow, + MethodStreamCostEstimate, + MethodClaimSearch, + MethodCommentList, + MethodCollectionResolve, MethodCommentReactList, - "version", - "routing_table_get", + MethodVersion, + MethodRoutingTableGet, } -// walletSpecificMethods are methods which require wallet_id. -// This list will inevitably turn stale sooner or later as new methods -// are added to the SDK so relaxedMethods should be used for strict validation -// whether wallet_id is required. var walletSpecificMethods = []string{ MethodGet, MethodPurchaseCreate, - - "resolve", - "claim_search", - + MethodResolve, + MethodClaimSearch, MethodPublish, - - "address_unused", - "address_list", - "address_is_mine", - - "account_list", - "account_balance", - "account_send", - "account_max_address_gap", - - "channel_abandon", - "channel_create", - "channel_list", - "channel_update", - "channel_export", - "channel_import", - "channel_sign", - - "collection_list", - "collection_create", - "collection_update", - "collection_abandon", - - "comment_abandon", - "comment_create", - "comment_hide", - "comment_update", - "comment_react", - "comment_pin", + MethodAddressUnused, + MethodAddressList, + MethodAddressIsMine, + MethodAccountList, + MethodAccountBalance, + MethodAccountSend, + MethodAccountMaxAddressGap, + MethodChannelAbandon, + MethodChannelCreate, + MethodChannelList, + MethodChannelUpdate, + MethodChannelExport, + MethodChannelImport, + MethodChannelSign, + MethodCollectionList, + MethodCollectionCreate, + MethodCollectionUpdate, + MethodCollectionAbandon, + MethodCommentAbandon, + MethodCommentCreate, + MethodCommentHide, + MethodCommentUpdate, + MethodCommentReact, + MethodCommentPin, MethodCommentReactList, - - "claim_list", - - "stream_abandon", + MethodClaimList, + MethodStreamAbandon, MethodStreamCreate, - "stream_list", + MethodStreamList, MethodStreamUpdate, - "stream_repost", - - "support_abandon", - "support_create", - "support_list", - + MethodStreamRepost, + MethodSupportAbandon, + MethodSupportCreate, + MethodSupportList, MethodSyncApply, - "sync_hash", - - "preference_get", - "preference_set", - - "purchase_list", - - "transaction_list", - - "txo_list", - "txo_spend", - "txo_sum", - "txo_plot", - - "utxo_list", - "utxo_release", - - "wallet_list", - "wallet_send", - "wallet_balance", - "wallet_encrypt", - "wallet_decrypt", - "wallet_lock", - "wallet_unlock", - "wallet_status", + MethodSyncHash, + MethodPreferenceGet, + MethodPreferenceSet, + MethodPurchaseList, + MethodTransactionList, + MethodTxoList, + MethodTxoSpend, + MethodTxoSum, + MethodTxoPlot, + MethodUtxoList, + MethodUtxoRelease, + MethodWalletList, + MethodWalletSend, + MethodWalletBalance, + MethodWalletEncrypt, + MethodWalletDecrypt, + MethodWalletLock, + MethodWalletUnlock, + MethodWalletStatus, } diff --git a/app/query/metrics.go b/app/query/metrics.go index 659c8292..2eaea2af 100644 --- a/app/query/metrics.go +++ b/app/query/metrics.go @@ -15,11 +15,14 @@ const ( CacheResultMiss = "miss" CacheResultSuccess = "success" CacheResultError = "error" + + CacheAreaChainquery = "chainquery" + CacheAreaInvalidateCall = "invalidate_call" ) var ( - queryRetrievalDurationBuckets = []float64{0.025, 0.05, 0.1, 0.25, 0.4, 1, 2.5, 5, 10, 25, 50, 100, 300} - cacheDurationBuckets = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0} + QueryCacheRetrievalDurationBuckets = []float64{0.025, 0.05, 0.1, 0.25, 0.4, 1, 2.5, 5, 10, 25, 50, 100, 300} + cacheDurationBuckets = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0} QueryCacheOperationDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ @@ -30,21 +33,29 @@ var ( }, []string{"operation", "result", "method"}, ) - QueryRetrievalDuration = promauto.NewHistogramVec( + QueryCacheRetrievalDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "query_cache", Name: "retrieval_duration_seconds", Help: "Latency for cold cache retrieval", - Buckets: queryRetrievalDurationBuckets, + Buckets: QueryCacheRetrievalDurationBuckets, }, []string{"result", "method"}, ) + QueryCacheErrorCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "query_cache", + Name: "error_count", + Help: "Errors unrelated to cache setting/retrieval", + }, + []string{"area"}, + ) ) func ObserveQueryCacheOperation(operation, result, method string, start time.Time) { QueryCacheOperationDuration.WithLabelValues(operation, result, method).Observe(float64(time.Since(start).Seconds())) } -func ObserveQueryRetrievalDuration(result, method string, start time.Time) { - QueryRetrievalDuration.WithLabelValues(result, method).Observe(float64(time.Since(start).Seconds())) +func ObserveQueryCacheRetrievalDuration(result, method string, start time.Time) { + QueryCacheRetrievalDuration.WithLabelValues(result, method).Observe(float64(time.Since(start).Seconds())) } diff --git a/go.mod b/go.mod index 52690c0a..86d68cb3 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,8 @@ require ( github.com/dgraph-io/ristretto v0.2.0 github.com/eko/gocache/lib/v4 v4.1.6 github.com/eko/gocache/store/redis/v4 v4.2.2 + github.com/eko/gocache/store/ristretto/v4 v4.2.2 + github.com/friendsofgo/errors v0.9.2 github.com/getsentry/sentry-go v0.13.0 github.com/go-chi/chi/v5 v5.0.8 github.com/go-chi/cors v1.2.1 @@ -61,6 +63,7 @@ require ( go.uber.org/zap v1.21.0 goa.design/goa/v3 v3.5.2 goa.design/plugins/v3 v3.4.3 + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f golang.org/x/oauth2 v0.6.0 golang.org/x/sync v0.9.0 gopkg.in/vansante/go-ffprobe.v2 v2.1.0 @@ -79,7 +82,7 @@ require ( github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect - golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect + golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect ) require ( diff --git a/go.sum b/go.sum index f0428ee8..865added 100644 --- a/go.sum +++ b/go.sum @@ -840,6 +840,8 @@ github.com/eko/gocache/lib/v4 v4.1.6 h1:5WWIGISKhE7mfkyF+SJyWwqa4Dp2mkdX8QsZpnEN github.com/eko/gocache/lib/v4 v4.1.6/go.mod h1:HFxC8IiG2WeRotg09xEnPD72sCheJiTSr4Li5Ameg7g= github.com/eko/gocache/store/redis/v4 v4.2.2 h1:Thw31fzGuH3WzJywsdbMivOmP550D6JS7GDHhvCJPA0= github.com/eko/gocache/store/redis/v4 v4.2.2/go.mod h1:LaTxLKx9TG/YUEybQvPMij++D7PBTIJ4+pzvk0ykz0w= +github.com/eko/gocache/store/ristretto/v4 v4.2.2 h1:lXFzoZ5ck6Gy6ON7f5DHSkNt122qN7KoroCVgVwF7oo= +github.com/eko/gocache/store/ristretto/v4 v4.2.2/go.mod h1:uIvBVJzqRepr5L0RsbkfQ2iYfbyos2fuji/s4yM+aUM= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -872,6 +874,8 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= +github.com/friendsofgo/errors v0.9.2 h1:X6NYxef4efCBdwI7BgS820zFaN7Cphrmb+Pljdzjtgk= +github.com/friendsofgo/errors v0.9.2/go.mod h1:yCvFW5AkDIL9qn7suHVLiI/gH228n7PC4Pn44IGoTOI= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= @@ -2504,6 +2508,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= diff --git a/pkg/chainquery/chainquery.go b/pkg/chainquery/chainquery.go new file mode 100644 index 00000000..d1aea1ac --- /dev/null +++ b/pkg/chainquery/chainquery.go @@ -0,0 +1,83 @@ +package chainquery + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/friendsofgo/errors" +) + +const ( + apiUrl = "https://chainquery.odysee.tv/api/sql" + queryTimeout = 15 * time.Second +) + +type HttpDoer interface { + Do(*http.Request) (*http.Response, error) +} + +type HeightResponse struct { + Success bool `json:"success"` + Error *string `json:"error"` + Data []HeightData `json:"data"` +} + +type HeightData struct { + Height int `json:"height"` +} + +var client HttpDoer = &http.Client{ + Timeout: queryTimeout, +} + +func GetHeight() (int, error) { + r := HeightResponse{} + err := makeRequest(client, "select height from block order by id desc limit 1", &r) + if err != nil { + return 0, errors.Wrap(err, "error retrieving latest height") + } + if len(r.Data) != 1 { + return 0, errors.Errorf("error retrieving latest height, expected %v items in response, got %v", 1, len(r.Data)) + } + return r.Data[0].Height, nil +} + +func makeRequest(client HttpDoer, query string, target any) error { + baseUrl, err := url.Parse(apiUrl) + if err != nil { + return err + } + params := url.Values{} + params.Add("query", query) + baseUrl.RawQuery = params.Encode() + + req, err := http.NewRequest(http.MethodGet, baseUrl.String(), nil) + if err != nil { + return fmt.Errorf("error creating request: %w", err) + } + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("error sending request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: got %v, want %v", resp.StatusCode, http.StatusOK) + } + + err = json.Unmarshal(body, target) + if err != nil { + return err + } + return nil +} diff --git a/pkg/chainquery/chainquery_test.go b/pkg/chainquery/chainquery_test.go new file mode 100644 index 00000000..c1bc6915 --- /dev/null +++ b/pkg/chainquery/chainquery_test.go @@ -0,0 +1,54 @@ +package chainquery + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type failureTestClient struct{} + +func (_ failureTestClient) Do(*http.Request) (*http.Response, error) { + data := HeightResponse{ + Success: true, + Error: nil, + Data: []HeightData{}, + } + + body, _ := json.Marshal(data) + + return &http.Response{ + Status: "200 OK", + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil +} + +func TestGetHeight(t *testing.T) { + require := require.New(t) + assert := assert.New(t) + + height, err := GetHeight() + require.NoError(err) + assert.Greater(height, 1500_000) +} + +func TestGetHeightFailure(t *testing.T) { + assert := assert.New(t) + + origClient := client + client = failureTestClient{} + defer func() { + client = origClient + }() + + height, err := GetHeight() + assert.ErrorContains(err, "error retrieving latest height, expected 1 items in response, got 0") + assert.Equal(0, height) +} diff --git a/pkg/sturdycache/sturdycache.go b/pkg/sturdycache/sturdycache.go index dbba0035..693f782d 100644 --- a/pkg/sturdycache/sturdycache.go +++ b/pkg/sturdycache/sturdycache.go @@ -2,14 +2,19 @@ package sturdycache import ( "context" + "time" + "github.com/dgraph-io/ristretto" "github.com/eko/gocache/lib/v4/cache" "github.com/eko/gocache/lib/v4/store" redis_store "github.com/eko/gocache/store/redis/v4" + ristretto_store "github.com/eko/gocache/store/ristretto/v4" "github.com/redis/go-redis/v9" "golang.org/x/exp/rand" ) +const ReplicatedCacheType = "redis" + type ReplicatedCache struct { masterCache *cache.Cache[any] replicaCaches []*cache.Cache[any] @@ -23,6 +28,7 @@ func NewReplicatedCache( replicaAddrs []string, password string, ) (*ReplicatedCache, error) { + masterClient := redis.NewClient(&redis.Options{ Addr: masterAddr, Password: password, @@ -49,12 +55,29 @@ func NewReplicatedCache( replicaCaches = append(replicaCaches, cache.New[any](replicaStore)) } - cache := &ReplicatedCache{ + baseStore := &ReplicatedCache{ masterCache: masterCache, replicaCaches: replicaCaches, readCaches: append(replicaCaches, masterCache), } + return baseStore, nil +} + +// AddLocalCache adds a local in-memory cache layer to a replicated cache instance. +func AddLocalCache(baseCache *ReplicatedCache) (cache.CacheInterface[any], error) { + // About 50k resolve responses with average size of 10KB + ristrettoCache, err := ristretto.NewCache(&ristretto.Config{NumCounters: 500_000, MaxCost: 500_000_000, BufferItems: 64}) + if err != nil { + return nil, err + } + ristrettoStore := ristretto_store.NewRistretto(ristrettoCache) + + cache := cache.NewChain[any]( + cache.New[any](ristrettoStore), + cache.New[any](baseCache), + ) + return cache, nil } @@ -68,6 +91,11 @@ func (rc *ReplicatedCache) Get(ctx context.Context, key any) (any, error) { return rc.readCaches[rand.Intn(len(rc.readCaches))].Get(ctx, key) } +// Get reads from master and replica caches. +func (rc *ReplicatedCache) GetWithTTL(ctx context.Context, key any) (any, time.Duration, error) { + return rc.readCaches[rand.Intn(len(rc.readCaches))].GetWithTTL(ctx, key) +} + // Invalidate master cache entries for given options. func (rc *ReplicatedCache) Invalidate(ctx context.Context, options ...store.InvalidateOption) error { return rc.masterCache.Invalidate(ctx, options...) @@ -85,5 +113,5 @@ func (rc *ReplicatedCache) Clear(ctx context.Context) error { // GetType returns cache type name. func (rc *ReplicatedCache) GetType() string { - return "replicated_redis" + return ReplicatedCacheType } diff --git a/pkg/sturdycache/sturdycache_test.go b/pkg/sturdycache/sturdycache_test.go index fb72f9b8..b3eb2bc0 100644 --- a/pkg/sturdycache/sturdycache_test.go +++ b/pkg/sturdycache/sturdycache_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/alicebob/miniredis/v2" + "github.com/eko/gocache/lib/v4/cache" "github.com/eko/gocache/lib/v4/store" "github.com/stretchr/testify/suite" ) @@ -16,12 +17,13 @@ import ( type ReplicatedCacheTestSuite struct { suite.Suite - master *miniredis.Miniredis - replicas []*miniredis.Miniredis - cache *ReplicatedCache - teardownFunc teardownFunc - ctx context.Context - cancel context.CancelFunc + master *miniredis.Miniredis + replicas []*miniredis.Miniredis + cache cache.CacheInterface[any] + replicatedCache *ReplicatedCache + teardownFunc teardownFunc + ctx context.Context + cancel context.CancelFunc } type TestStruct struct { @@ -38,12 +40,16 @@ func (t *TestStruct) UnmarshalBinary(data []byte) error { } func (s *ReplicatedCacheTestSuite) SetupTest() { - s.cache, s.master, s.replicas, s.teardownFunc = CreateTestCache(s.T()) + var err error + s.replicatedCache, s.master, s.replicas, s.teardownFunc = CreateTestCache(s.T()) + s.cache, err = AddLocalCache(s.replicatedCache) + s.Require().NoError(err) } func (s *ReplicatedCacheTestSuite) TearDownTest() { s.teardownFunc() } + func (s *ReplicatedCacheTestSuite) SetupSuite() { s.ctx, s.cancel = context.WithTimeout(context.Background(), 30*time.Second) } @@ -54,8 +60,8 @@ func (s *ReplicatedCacheTestSuite) TearDownSuite() { func (s *ReplicatedCacheTestSuite) TestNewReplicatedCache() { s.Require().NotNil(s.cache) - s.Require().NotNil(s.cache.masterCache) - s.Require().Len(s.cache.replicaCaches, len(s.replicas)) + s.Require().NotNil(s.replicatedCache.masterCache) + s.Require().Len(s.replicatedCache.replicaCaches, len(s.replicas)) } func (s *ReplicatedCacheTestSuite) TestSet() { diff --git a/server/server.go b/server/server.go index 219d6241..37bcf28b 100644 --- a/server/server.go +++ b/server/server.go @@ -31,9 +31,9 @@ func NewServer(address string, sdkRouter *sdkrouter.Router, rOpts *api.RoutesOpt api.InstallRoutes(r, sdkRouter, rOpts) r.Use(monitor.ErrorLoggingMiddleware) r.Use(defaultHeadersMiddleware(map[string]string{ - "Server": "api.lbry.tv", + "Server": "api.odysee.com", "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Headers": "content-type", // Needed this to get any request to work + "Access-Control-Allow-Headers": "content-type", })) return &Server{