diff --git a/app/asynquery/asynquery.go b/app/asynquery/asynquery.go index 98fbecd8..2010d489 100644 --- a/app/asynquery/asynquery.go +++ b/app/asynquery/asynquery.go @@ -13,6 +13,7 @@ import ( "github.com/OdyseeTeam/odysee-api/app/geopublish/metrics" "github.com/OdyseeTeam/odysee-api/app/query" + "github.com/OdyseeTeam/odysee-api/internal/monitor" "github.com/OdyseeTeam/odysee-api/internal/tasks" "github.com/OdyseeTeam/odysee-api/models" "github.com/OdyseeTeam/odysee-api/pkg/logging" @@ -232,8 +233,9 @@ func (m *CallManager) robustCall(ctx context.Context, aq *models.Asynquery, patc log.Info("error getting sdk address for user") return asynq.SkipRetry } + sdkAddress := u.R.LbrynetServer.Address - caller := query.NewCaller(u.R.LbrynetServer.Address, aq.UserID) + caller := query.NewCaller(sdkAddress, aq.UserID) t := time.Now() @@ -261,18 +263,23 @@ func (m *CallManager) robustCall(ctx context.Context, aq *models.Asynquery, patc } delete(pp, "file_path") - res, err := caller.Call(context.TODO(), request) + ctx = query.AttachOrigin(ctx, "asynquery") + res, err := caller.Call(ctx, request) metrics.ProcessingTime.WithLabelValues(metrics.LabelProcessingQuery).Observe(float64(time.Since(t))) - QueriesSent.Inc() + if err != nil { QueriesFailed.Inc() - log.Warn(sdkNetError.Error(), "err", err) - resErrMsg := err.Error() - log.Warn("asynquery failed", "err", resErrMsg) - m.finalizeQueryRecord(ctx, aq.ID, nil, err.Error()) + log.Warn("asynquery request failed", "err", err) + monitor.ErrorToSentry(fmt.Errorf("asynquery request failed: %w", err), map[string]string{ + "user_id": fmt.Sprintf("%d", u.ID), + "endpoint": sdkAddress, + "method": request.Method, + }) + + err := m.finalizeQueryRecord(ctx, aq.ID, nil, err.Error()) if err != nil { - log.Warn("failed to finalize query record", "err", err) + log.Warn("failed to finalize asynquery record", "err", err) } return sdkNetError } diff --git a/app/geopublish/geopublish.go b/app/geopublish/geopublish.go index c8dc68c0..a0f9a66a 100644 --- a/app/geopublish/geopublish.go +++ b/app/geopublish/geopublish.go @@ -426,7 +426,7 @@ func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *qu c := query.NewCaller(sdkAddress, userID) c.Cache = qCache c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { - q := query.GetFromContext(ctx) + q := query.QueryFromContext(ctx) params := q.ParamsAsMap() params[fileNameParam] = filename q.Request.Params = params diff --git a/app/proxy/proxy.go b/app/proxy/proxy.go index 52b05c85..599ac86f 100644 --- a/app/proxy/proxy.go +++ b/app/proxy/proxy.go @@ -136,9 +136,7 @@ func Handle(w http.ResponseWriter, r *http.Request) { c.Cache = qCache - metrics.ProxyCallCounter.WithLabelValues(rpcReq.Method, c.Endpoint(), origin).Inc() rpcRes, err := c.Call(r.Context(), rpcReq) - metrics.ProxyCallDurations.WithLabelValues(rpcReq.Method, c.Endpoint(), origin).Observe(c.Duration) if err != nil { writeResponse(w, rpcerrors.ToJSON(err)) @@ -154,8 +152,6 @@ func Handle(w http.ResponseWriter, r *http.Request) { } monitor.ErrorToSentry(err, map[string]string{"request": fmt.Sprintf("%+v", rpcReq), "response": fmt.Sprintf("%+v", rpcRes)}) observeFailure(metrics.GetDuration(r), rpcReq.Method, metrics.FailureKindNet) - metrics.ProxyCallFailedDurations.WithLabelValues(rpcReq.Method, c.Endpoint(), origin, metrics.FailureKindNet).Observe(c.Duration) - metrics.ProxyCallFailedCounter.WithLabelValues(rpcReq.Method, c.Endpoint(), origin, metrics.FailureKindNet).Inc() logger.Log().Errorf("error calling lbrynet: %v, request: %+v", err, rpcReq) return } diff --git a/app/publish/publish.go b/app/publish/publish.go index f945ec09..cb700dc1 100644 --- a/app/publish/publish.go +++ b/app/publish/publish.go @@ -227,7 +227,7 @@ func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *qu c := query.NewCaller(sdkAddress, userID) c.Cache = qCache c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { - q := query.GetFromContext(ctx) + q := query.QueryFromContext(ctx) params := q.ParamsAsMap() params[fileNameParam] = filename q.Request.Params = params diff --git a/app/query/caller.go b/app/query/caller.go index 0154f334..02c439d1 100644 --- a/app/query/caller.go +++ b/app/query/caller.go @@ -33,14 +33,6 @@ const ( AllMethodsHook = "" ) -type contextKey string - -var ( - contextKeyQuery = contextKey("query") - contextKeyResponse = contextKey("response") - contextKeyLogEntry = contextKey("log-entry") -) - type HTTPRequester interface { Do(req *http.Request) (res *http.Response, err error) } @@ -50,38 +42,13 @@ type HTTPRequester interface { // Hooks can modify both query and response, as well as perform additional queries via supplied Caller. // If nil is returned instead of *jsonrpc.RPCResponse, original response is returned. type Hook func(*Caller, context.Context) (*jsonrpc.RPCResponse, error) + type hookEntry struct { method string function Hook name string } -func AttachToContext(ctx context.Context, query *Query) context.Context { - return context.WithValue(ctx, contextKeyQuery, query) -} - -func GetFromContext(ctx context.Context) *Query { - return ctx.Value(contextKeyQuery).(*Query) -} - -func WithResponse(ctx context.Context, response *jsonrpc.RPCResponse) context.Context { - return context.WithValue(ctx, contextKeyResponse, response) -} - -func GetResponse(ctx context.Context) *jsonrpc.RPCResponse { - return ctx.Value(contextKeyResponse).(*jsonrpc.RPCResponse) -} - -func WithLogEntry(ctx context.Context, entry *logrus.Entry) context.Context { - return context.WithValue(ctx, contextKeyLogEntry, entry) -} - -// WithLogField injects additional data into default post-query log entry -func WithLogField(ctx context.Context, key string, value interface{}) { - e := ctx.Value(contextKeyLogEntry).(*logrus.Entry) - e.Data[key] = value -} - // Caller patches through JSON-RPC requests from clients, doing pre/post-processing, // account processing and validation. type Caller struct { @@ -189,6 +156,20 @@ func (c *Caller) Endpoint() string { // Call method forwards a JSON-RPC request to the lbrynet server. // It returns a response that is ready to be sent back to the JSON-RPC client as is. func (c *Caller) Call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) { + origin := OriginFromContext(ctx) + metrics.ProxyCallCounter.WithLabelValues(req.Method, c.Endpoint(), origin).Inc() + res, err := c.call(ctx, req) + metrics.ProxyCallDurations.WithLabelValues(req.Method, c.Endpoint(), origin).Observe(c.Duration) + if err != nil { + metrics.ProxyCallFailedDurations.WithLabelValues(req.Method, c.Endpoint(), origin, metrics.FailureKindNet).Observe(c.Duration) + metrics.ProxyCallFailedCounter.WithLabelValues(req.Method, c.Endpoint(), origin, metrics.FailureKindNet).Inc() + } + return res, err +} + +// Call method forwards a JSON-RPC request to the lbrynet server. +// It returns a response that is ready to be sent back to the JSON-RPC client as is. +func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) { if c.endpoint == "" { return nil, errors.Err("cannot call blank endpoint") } @@ -205,7 +186,7 @@ func (c *Caller) Call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP // Applying preflight hooks var res *jsonrpc.RPCResponse - ctx = AttachToContext(ctx, q) + ctx = AttachQuery(ctx, q) for _, hook := range c.preflightHooks { if isMatchingHook(q.Method(), hook) { res, err = hook.function(c, ctx) @@ -253,7 +234,7 @@ func (c *Caller) SendQuery(ctx context.Context, q *Query) (*jsonrpc.RPCResponse, start := time.Now() client := c.getRPCClient(q.Method()) r, err = client.CallRaw(q.Request) - c.Duration = time.Since(start).Seconds() + c.Duration += time.Since(start).Seconds() logger.Log().Debugf("sent request: %s %+v (%.2fs)", q.Method(), q.Params(), c.Duration) // Generally a HTTP transport failure (connect error etc) @@ -305,8 +286,8 @@ func (c *Caller) SendQuery(ctx context.Context, q *Query) (*jsonrpc.RPCResponse, // Applying postflight hooks var hookResp *jsonrpc.RPCResponse - ctx = WithLogEntry(ctx, logEntry) - ctx = WithResponse(ctx, r) + ctx = AttachLogEntry(ctx, logEntry) + ctx = AttachResponse(ctx, r) for _, hook := range c.postflightHooks { if isMatchingHook(q.Method(), hook) { hookResp, err = hook.function(c, ctx) diff --git a/app/query/caller_test.go b/app/query/caller_test.go index f1e29274..922eaf40 100644 --- a/app/query/caller_test.go +++ b/app/query/caller_test.go @@ -220,12 +220,12 @@ func TestCaller_AddPreflightHookAmendingQueryParams(t *testing.T) { c := NewCaller(srv.URL, 0) c.AddPreflightHook(relaxedMethods[0], func(_ *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { - params := GetFromContext(ctx).ParamsAsMap() + params := QueryFromContext(ctx).ParamsAsMap() if params == nil { - GetFromContext(ctx).Request.Params = map[string]string{"param": "123"} + QueryFromContext(ctx).Request.Params = map[string]string{"param": "123"} } else { params["param"] = "123" - GetFromContext(ctx).Request.Params = params + QueryFromContext(ctx).Request.Params = params } return nil, nil }, "") @@ -306,7 +306,7 @@ func TestCaller_AddPostflightHook_Response(t *testing.T) { ` c.AddPostflightHook("wallet_", func(c *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { - r := GetResponse(ctx) + r := ResponseFromContext(ctx) r.Result = "0.0" return r, nil }, "") @@ -368,7 +368,7 @@ func TestCaller_CloneWithoutHook(t *testing.T) { c.AddPostflightHook(MethodResolve, func(c *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { // This will be cloned without the current hook but the previous one should increment `timesCalled` once again cc := c.CloneWithoutHook(c.Endpoint(), MethodResolve, "lbrynext_resolve") - q := GetFromContext(ctx) + q := QueryFromContext(ctx) _, err := cc.SendQuery(ctx, q) assert.NoError(t, err) return nil, nil @@ -564,7 +564,7 @@ func TestCaller_CallQueryWithRetry(t *testing.T) { // check that sdk loads the wallet and retries the query if the wallet was not initially loaded c := NewCaller(addr, dummyUserID) - r, err := c.SendQuery(AttachToContext(bgctx(), q), q) + r, err := c.SendQuery(AttachQuery(bgctx(), q), q) require.NoError(t, err) require.Nil(t, r.Error) } @@ -594,10 +594,10 @@ func TestCaller_timeouts(t *testing.T) { }) }() - _, err = c.SendQuery(AttachToContext(bgctx(), q), q) + _, err = c.SendQuery(AttachQuery(bgctx(), q), q) require.NoError(t, err) - _, err = c.SendQuery(AttachToContext(bgctx(), q), q) + _, err = c.SendQuery(AttachQuery(bgctx(), q), q) require.Error(t, err, `timeout awaiting response headers`) } @@ -634,7 +634,7 @@ func TestCaller_DontReloadWalletAfterOtherErrors(t *testing.T) { }), ) - r, err := c.SendQuery(AttachToContext(bgctx(), q), q) + r, err := c.SendQuery(AttachQuery(bgctx(), q), q) require.NoError(t, err) require.Equal(t, "Couldn't find wallet: //", r.Error.Message) } @@ -676,7 +676,7 @@ func TestCaller_DontReloadWalletIfAlreadyLoaded(t *testing.T) { }), ) - r, err := c.SendQuery(AttachToContext(bgctx(), q), q) + r, err := c.SendQuery(AttachQuery(bgctx(), q), q) require.NoError(t, err) require.Nil(t, r.Error) diff --git a/app/query/context.go b/app/query/context.go new file mode 100644 index 00000000..03db3edc --- /dev/null +++ b/app/query/context.go @@ -0,0 +1,54 @@ +package query + +import ( + "context" + + "github.com/sirupsen/logrus" + "github.com/ybbus/jsonrpc" +) + +type contextKey string + +var ( + contextKeyQuery = contextKey("query") + contextKeyResponse = contextKey("response") + contextKeyLogEntry = contextKey("log-entry") + contextKeyOrigin = contextKey("origin") +) + +func AttachQuery(ctx context.Context, query *Query) context.Context { + return context.WithValue(ctx, contextKeyQuery, query) +} + +func QueryFromContext(ctx context.Context) *Query { + return ctx.Value(contextKeyQuery).(*Query) +} + +func AttachResponse(ctx context.Context, response *jsonrpc.RPCResponse) context.Context { + return context.WithValue(ctx, contextKeyResponse, response) +} + +func ResponseFromContext(ctx context.Context) *jsonrpc.RPCResponse { + return ctx.Value(contextKeyResponse).(*jsonrpc.RPCResponse) +} + +func AttachLogEntry(ctx context.Context, entry *logrus.Entry) context.Context { + return context.WithValue(ctx, contextKeyLogEntry, entry) +} + +// WithLogField injects additional data into default post-query log entry +func WithLogField(ctx context.Context, key string, value interface{}) { + e := ctx.Value(contextKeyLogEntry).(*logrus.Entry) + e.Data[key] = value +} + +func AttachOrigin(ctx context.Context, origin string) context.Context { + return context.WithValue(ctx, contextKeyOrigin, origin) +} + +func OriginFromContext(ctx context.Context) string { + if origin, ok := ctx.Value(contextKeyOrigin).(string); ok { + return origin + } + return "" +} diff --git a/app/query/processors.go b/app/query/processors.go index 6b98d3f8..39bfe4ee 100644 --- a/app/query/processors.go +++ b/app/query/processors.go @@ -108,7 +108,7 @@ func preflightHookGet(caller *Caller, ctx context.Context) (*jsonrpc.RPCResponse contentURL, metricLabel string isPaidStream bool ) - query := GetFromContext(ctx) + query := QueryFromContext(ctx) response := &jsonrpc.RPCResponse{ ID: query.Request.ID, @@ -193,7 +193,7 @@ func preflightHookGet(caller *Caller, ctx context.Context) (*jsonrpc.RPCResponse if err != nil { return nil, err } - purchaseRes, err := caller.SendQuery(AttachToContext(ctx, purchaseQuery), purchaseQuery) + purchaseRes, err := caller.SendQuery(AttachQuery(ctx, purchaseQuery), purchaseQuery) if err != nil { return nil, err } @@ -282,7 +282,7 @@ func checkStreamAccess(ctx context.Context, claim *ljsonrpc.Claim) (bool, error) accessType, environ string ) - params := GetFromContext(ctx).ParamsAsMap() + params := QueryFromContext(ctx).ParamsAsMap() _, isLivestream := params["base_streaming_url"] if p, ok := params[iapi.ParamEnviron]; ok { environ, _ = p.(string) @@ -441,7 +441,7 @@ func resolve(ctx context.Context, c *Caller, q *Query, url string) (*ljsonrpc.Cl // preflightHookClaimSearch patches tag parameters of RPC request to support scheduled and unlisted content. func preflightHookClaimSearch(_ *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { - query := GetFromContext(ctx) + query := QueryFromContext(ctx) origParams := query.ParamsAsMap() params := &ClaimSearchParams{} err := decode(origParams, params) @@ -474,7 +474,7 @@ func preflightHookClaimSearch(_ *Caller, ctx context.Context) (*jsonrpc.RPCRespo func postClaimSearchArfleetThumbs(_ *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) { logger := zapadapter.NewKV(nil).With("module", "query.preprocessors") baseUrl := config.GetArfleetCDN() - resp := GetResponse(ctx) + resp := ResponseFromContext(ctx) pRes, err := arweave.ReplaceAssetUrls(baseUrl, resp.Result, "items", "value.thumbnail.url") if err != nil { logger.Warn("error replacing asset urls", "err", err) @@ -488,7 +488,7 @@ func postResolveArfleetThumbs(_ *Caller, ctx context.Context) (*jsonrpc.RPCRespo logger := zapadapter.NewKV(nil).With("module", "query.preprocessors") baseUrl := config.GetArfleetCDN() - resp := GetResponse(ctx) + resp := ResponseFromContext(ctx) claims, ok := resp.Result.(map[string]any) if !ok { logger.Warn("error processing resolve response", "result", resp.Result) @@ -604,7 +604,7 @@ func getStatusResponse(_ *Caller, ctx context.Context) (*jsonrpc.RPCResponse, er } ` json.Unmarshal([]byte(rawResponse), &response) - rpcResponse := GetFromContext(ctx).newResponse() + rpcResponse := QueryFromContext(ctx).newResponse() rpcResponse.Result = response return rpcResponse, nil }