Skip to content

Commit

Permalink
Move metrics tracking to caller, add observation to asynquery failures
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Nov 5, 2024
1 parent 8e2ff36 commit 6943b0e
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 69 deletions.
23 changes: 15 additions & 8 deletions app/asynquery/asynquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/OdyseeTeam/odysee-api/app/geopublish/metrics"
"github.com/OdyseeTeam/odysee-api/app/query"
"github.com/OdyseeTeam/odysee-api/internal/monitor"
"github.com/OdyseeTeam/odysee-api/internal/tasks"
"github.com/OdyseeTeam/odysee-api/models"
"github.com/OdyseeTeam/odysee-api/pkg/logging"
Expand Down Expand Up @@ -232,8 +233,9 @@ func (m *CallManager) robustCall(ctx context.Context, aq *models.Asynquery, patc
log.Info("error getting sdk address for user")
return asynq.SkipRetry
}
sdkAddress := u.R.LbrynetServer.Address

caller := query.NewCaller(u.R.LbrynetServer.Address, aq.UserID)
caller := query.NewCaller(sdkAddress, aq.UserID)

t := time.Now()

Expand Down Expand Up @@ -261,18 +263,23 @@ func (m *CallManager) robustCall(ctx context.Context, aq *models.Asynquery, patc
}
delete(pp, "file_path")

res, err := caller.Call(context.TODO(), request)
ctx = query.AttachOrigin(ctx, "asynquery")
res, err := caller.Call(ctx, request)
metrics.ProcessingTime.WithLabelValues(metrics.LabelProcessingQuery).Observe(float64(time.Since(t)))

QueriesSent.Inc()

if err != nil {
QueriesFailed.Inc()
log.Warn(sdkNetError.Error(), "err", err)
resErrMsg := err.Error()
log.Warn("asynquery failed", "err", resErrMsg)
m.finalizeQueryRecord(ctx, aq.ID, nil, err.Error())
log.Warn("asynquery request failed", "err", err)
monitor.ErrorToSentry(fmt.Errorf("asynquery request failed: %w", err), map[string]string{
"user_id": fmt.Sprintf("%d", u.ID),
"endpoint": sdkAddress,
"method": request.Method,
})

err := m.finalizeQueryRecord(ctx, aq.ID, nil, err.Error())
if err != nil {
log.Warn("failed to finalize query record", "err", err)
log.Warn("failed to finalize asynquery record", "err", err)
}
return sdkNetError
}
Expand Down
2 changes: 1 addition & 1 deletion app/geopublish/geopublish.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *qu
c := query.NewCaller(sdkAddress, userID)
c.Cache = qCache
c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
q := query.GetFromContext(ctx)
q := query.QueryFromContext(ctx)
params := q.ParamsAsMap()
params[fileNameParam] = filename
q.Request.Params = params
Expand Down
4 changes: 0 additions & 4 deletions app/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ func Handle(w http.ResponseWriter, r *http.Request) {

c.Cache = qCache

metrics.ProxyCallCounter.WithLabelValues(rpcReq.Method, c.Endpoint(), origin).Inc()
rpcRes, err := c.Call(r.Context(), rpcReq)
metrics.ProxyCallDurations.WithLabelValues(rpcReq.Method, c.Endpoint(), origin).Observe(c.Duration)

if err != nil {
writeResponse(w, rpcerrors.ToJSON(err))
Expand All @@ -154,8 +152,6 @@ func Handle(w http.ResponseWriter, r *http.Request) {
}
monitor.ErrorToSentry(err, map[string]string{"request": fmt.Sprintf("%+v", rpcReq), "response": fmt.Sprintf("%+v", rpcRes)})
observeFailure(metrics.GetDuration(r), rpcReq.Method, metrics.FailureKindNet)
metrics.ProxyCallFailedDurations.WithLabelValues(rpcReq.Method, c.Endpoint(), origin, metrics.FailureKindNet).Observe(c.Duration)
metrics.ProxyCallFailedCounter.WithLabelValues(rpcReq.Method, c.Endpoint(), origin, metrics.FailureKindNet).Inc()
logger.Log().Errorf("error calling lbrynet: %v, request: %+v", err, rpcReq)
return
}
Expand Down
2 changes: 1 addition & 1 deletion app/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *qu
c := query.NewCaller(sdkAddress, userID)
c.Cache = qCache
c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
q := query.GetFromContext(ctx)
q := query.QueryFromContext(ctx)
params := q.ParamsAsMap()
params[fileNameParam] = filename
q.Request.Params = params
Expand Down
57 changes: 19 additions & 38 deletions app/query/caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ const (
AllMethodsHook = ""
)

type contextKey string

var (
contextKeyQuery = contextKey("query")
contextKeyResponse = contextKey("response")
contextKeyLogEntry = contextKey("log-entry")
)

type HTTPRequester interface {
Do(req *http.Request) (res *http.Response, err error)
}
Expand All @@ -50,38 +42,13 @@ type HTTPRequester interface {
// Hooks can modify both query and response, as well as perform additional queries via supplied Caller.
// If nil is returned instead of *jsonrpc.RPCResponse, original response is returned.
type Hook func(*Caller, context.Context) (*jsonrpc.RPCResponse, error)

type hookEntry struct {
method string
function Hook
name string
}

func AttachToContext(ctx context.Context, query *Query) context.Context {
return context.WithValue(ctx, contextKeyQuery, query)
}

func GetFromContext(ctx context.Context) *Query {
return ctx.Value(contextKeyQuery).(*Query)
}

func WithResponse(ctx context.Context, response *jsonrpc.RPCResponse) context.Context {
return context.WithValue(ctx, contextKeyResponse, response)
}

func GetResponse(ctx context.Context) *jsonrpc.RPCResponse {
return ctx.Value(contextKeyResponse).(*jsonrpc.RPCResponse)
}

func WithLogEntry(ctx context.Context, entry *logrus.Entry) context.Context {
return context.WithValue(ctx, contextKeyLogEntry, entry)
}

// WithLogField injects additional data into default post-query log entry
func WithLogField(ctx context.Context, key string, value interface{}) {
e := ctx.Value(contextKeyLogEntry).(*logrus.Entry)
e.Data[key] = value
}

// Caller patches through JSON-RPC requests from clients, doing pre/post-processing,
// account processing and validation.
type Caller struct {
Expand Down Expand Up @@ -189,6 +156,20 @@ func (c *Caller) Endpoint() string {
// Call method forwards a JSON-RPC request to the lbrynet server.
// It returns a response that is ready to be sent back to the JSON-RPC client as is.
func (c *Caller) Call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) {
origin := OriginFromContext(ctx)
metrics.ProxyCallCounter.WithLabelValues(req.Method, c.Endpoint(), origin).Inc()
res, err := c.call(ctx, req)
metrics.ProxyCallDurations.WithLabelValues(req.Method, c.Endpoint(), origin).Observe(c.Duration)
if err != nil {
metrics.ProxyCallFailedDurations.WithLabelValues(req.Method, c.Endpoint(), origin, metrics.FailureKindNet).Observe(c.Duration)
metrics.ProxyCallFailedCounter.WithLabelValues(req.Method, c.Endpoint(), origin, metrics.FailureKindNet).Inc()
}
return res, err
}

// Call method forwards a JSON-RPC request to the lbrynet server.
// It returns a response that is ready to be sent back to the JSON-RPC client as is.
func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) {
if c.endpoint == "" {
return nil, errors.Err("cannot call blank endpoint")
}
Expand All @@ -205,7 +186,7 @@ func (c *Caller) Call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP

// Applying preflight hooks
var res *jsonrpc.RPCResponse
ctx = AttachToContext(ctx, q)
ctx = AttachQuery(ctx, q)
for _, hook := range c.preflightHooks {
if isMatchingHook(q.Method(), hook) {
res, err = hook.function(c, ctx)
Expand Down Expand Up @@ -253,7 +234,7 @@ func (c *Caller) SendQuery(ctx context.Context, q *Query) (*jsonrpc.RPCResponse,
start := time.Now()
client := c.getRPCClient(q.Method())
r, err = client.CallRaw(q.Request)
c.Duration = time.Since(start).Seconds()
c.Duration += time.Since(start).Seconds()
logger.Log().Debugf("sent request: %s %+v (%.2fs)", q.Method(), q.Params(), c.Duration)

// Generally a HTTP transport failure (connect error etc)
Expand Down Expand Up @@ -305,8 +286,8 @@ func (c *Caller) SendQuery(ctx context.Context, q *Query) (*jsonrpc.RPCResponse,

// Applying postflight hooks
var hookResp *jsonrpc.RPCResponse
ctx = WithLogEntry(ctx, logEntry)
ctx = WithResponse(ctx, r)
ctx = AttachLogEntry(ctx, logEntry)
ctx = AttachResponse(ctx, r)
for _, hook := range c.postflightHooks {
if isMatchingHook(q.Method(), hook) {
hookResp, err = hook.function(c, ctx)
Expand Down
20 changes: 10 additions & 10 deletions app/query/caller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ func TestCaller_AddPreflightHookAmendingQueryParams(t *testing.T) {
c := NewCaller(srv.URL, 0)

c.AddPreflightHook(relaxedMethods[0], func(_ *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
params := GetFromContext(ctx).ParamsAsMap()
params := QueryFromContext(ctx).ParamsAsMap()
if params == nil {
GetFromContext(ctx).Request.Params = map[string]string{"param": "123"}
QueryFromContext(ctx).Request.Params = map[string]string{"param": "123"}
} else {
params["param"] = "123"
GetFromContext(ctx).Request.Params = params
QueryFromContext(ctx).Request.Params = params
}
return nil, nil
}, "")
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestCaller_AddPostflightHook_Response(t *testing.T) {
`

c.AddPostflightHook("wallet_", func(c *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
r := GetResponse(ctx)
r := ResponseFromContext(ctx)
r.Result = "0.0"
return r, nil
}, "")
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestCaller_CloneWithoutHook(t *testing.T) {
c.AddPostflightHook(MethodResolve, func(c *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
// This will be cloned without the current hook but the previous one should increment `timesCalled` once again
cc := c.CloneWithoutHook(c.Endpoint(), MethodResolve, "lbrynext_resolve")
q := GetFromContext(ctx)
q := QueryFromContext(ctx)
_, err := cc.SendQuery(ctx, q)
assert.NoError(t, err)
return nil, nil
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestCaller_CallQueryWithRetry(t *testing.T) {
// check that sdk loads the wallet and retries the query if the wallet was not initially loaded

c := NewCaller(addr, dummyUserID)
r, err := c.SendQuery(AttachToContext(bgctx(), q), q)
r, err := c.SendQuery(AttachQuery(bgctx(), q), q)
require.NoError(t, err)
require.Nil(t, r.Error)
}
Expand Down Expand Up @@ -594,10 +594,10 @@ func TestCaller_timeouts(t *testing.T) {
})
}()

_, err = c.SendQuery(AttachToContext(bgctx(), q), q)
_, err = c.SendQuery(AttachQuery(bgctx(), q), q)
require.NoError(t, err)

_, err = c.SendQuery(AttachToContext(bgctx(), q), q)
_, err = c.SendQuery(AttachQuery(bgctx(), q), q)
require.Error(t, err, `timeout awaiting response headers`)
}

Expand Down Expand Up @@ -634,7 +634,7 @@ func TestCaller_DontReloadWalletAfterOtherErrors(t *testing.T) {
}),
)

r, err := c.SendQuery(AttachToContext(bgctx(), q), q)
r, err := c.SendQuery(AttachQuery(bgctx(), q), q)
require.NoError(t, err)
require.Equal(t, "Couldn't find wallet: //", r.Error.Message)
}
Expand Down Expand Up @@ -676,7 +676,7 @@ func TestCaller_DontReloadWalletIfAlreadyLoaded(t *testing.T) {
}),
)

r, err := c.SendQuery(AttachToContext(bgctx(), q), q)
r, err := c.SendQuery(AttachQuery(bgctx(), q), q)

require.NoError(t, err)
require.Nil(t, r.Error)
Expand Down
54 changes: 54 additions & 0 deletions app/query/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package query

import (
"context"

"github.com/sirupsen/logrus"
"github.com/ybbus/jsonrpc"
)

type contextKey string

var (
contextKeyQuery = contextKey("query")
contextKeyResponse = contextKey("response")
contextKeyLogEntry = contextKey("log-entry")
contextKeyOrigin = contextKey("origin")
)

func AttachQuery(ctx context.Context, query *Query) context.Context {
return context.WithValue(ctx, contextKeyQuery, query)
}

func QueryFromContext(ctx context.Context) *Query {
return ctx.Value(contextKeyQuery).(*Query)
}

func AttachResponse(ctx context.Context, response *jsonrpc.RPCResponse) context.Context {
return context.WithValue(ctx, contextKeyResponse, response)
}

func ResponseFromContext(ctx context.Context) *jsonrpc.RPCResponse {
return ctx.Value(contextKeyResponse).(*jsonrpc.RPCResponse)
}

func AttachLogEntry(ctx context.Context, entry *logrus.Entry) context.Context {
return context.WithValue(ctx, contextKeyLogEntry, entry)
}

// WithLogField injects additional data into default post-query log entry
func WithLogField(ctx context.Context, key string, value interface{}) {
e := ctx.Value(contextKeyLogEntry).(*logrus.Entry)
e.Data[key] = value
}

func AttachOrigin(ctx context.Context, origin string) context.Context {
return context.WithValue(ctx, contextKeyOrigin, origin)
}

func OriginFromContext(ctx context.Context) string {
if origin, ok := ctx.Value(contextKeyOrigin).(string); ok {
return origin
}
return ""
}
14 changes: 7 additions & 7 deletions app/query/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func preflightHookGet(caller *Caller, ctx context.Context) (*jsonrpc.RPCResponse
contentURL, metricLabel string
isPaidStream bool
)
query := GetFromContext(ctx)
query := QueryFromContext(ctx)

response := &jsonrpc.RPCResponse{
ID: query.Request.ID,
Expand Down Expand Up @@ -193,7 +193,7 @@ func preflightHookGet(caller *Caller, ctx context.Context) (*jsonrpc.RPCResponse
if err != nil {
return nil, err
}
purchaseRes, err := caller.SendQuery(AttachToContext(ctx, purchaseQuery), purchaseQuery)
purchaseRes, err := caller.SendQuery(AttachQuery(ctx, purchaseQuery), purchaseQuery)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func checkStreamAccess(ctx context.Context, claim *ljsonrpc.Claim) (bool, error)
accessType, environ string
)

params := GetFromContext(ctx).ParamsAsMap()
params := QueryFromContext(ctx).ParamsAsMap()
_, isLivestream := params["base_streaming_url"]
if p, ok := params[iapi.ParamEnviron]; ok {
environ, _ = p.(string)
Expand Down Expand Up @@ -441,7 +441,7 @@ func resolve(ctx context.Context, c *Caller, q *Query, url string) (*ljsonrpc.Cl

// preflightHookClaimSearch patches tag parameters of RPC request to support scheduled and unlisted content.
func preflightHookClaimSearch(_ *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
query := GetFromContext(ctx)
query := QueryFromContext(ctx)
origParams := query.ParamsAsMap()
params := &ClaimSearchParams{}
err := decode(origParams, params)
Expand Down Expand Up @@ -474,7 +474,7 @@ func preflightHookClaimSearch(_ *Caller, ctx context.Context) (*jsonrpc.RPCRespo
func postClaimSearchArfleetThumbs(_ *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
logger := zapadapter.NewKV(nil).With("module", "query.preprocessors")
baseUrl := config.GetArfleetCDN()
resp := GetResponse(ctx)
resp := ResponseFromContext(ctx)
pRes, err := arweave.ReplaceAssetUrls(baseUrl, resp.Result, "items", "value.thumbnail.url")
if err != nil {
logger.Warn("error replacing asset urls", "err", err)
Expand All @@ -488,7 +488,7 @@ func postResolveArfleetThumbs(_ *Caller, ctx context.Context) (*jsonrpc.RPCRespo
logger := zapadapter.NewKV(nil).With("module", "query.preprocessors")
baseUrl := config.GetArfleetCDN()

resp := GetResponse(ctx)
resp := ResponseFromContext(ctx)
claims, ok := resp.Result.(map[string]any)
if !ok {
logger.Warn("error processing resolve response", "result", resp.Result)
Expand Down Expand Up @@ -604,7 +604,7 @@ func getStatusResponse(_ *Caller, ctx context.Context) (*jsonrpc.RPCResponse, er
}
`
json.Unmarshal([]byte(rawResponse), &response)
rpcResponse := GetFromContext(ctx).newResponse()
rpcResponse := QueryFromContext(ctx).newResponse()
rpcResponse.Result = response
return rpcResponse, nil
}
Expand Down

0 comments on commit 6943b0e

Please sign in to comment.