diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1aa413f7..a58994c1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,6 +16,10 @@ jobs: run: make build - name: Test with the Go CLI run: make test + - name: Run Gosec Security Scanner + uses: securego/gosec@master + with: + args: -exclude-dir=test -tests=false ./... - name: Generate coverage report run: make coverage - name: Upload coverage reports to Codecov diff --git a/common/config.go b/common/config.go index ac11435c..008d021d 100644 --- a/common/config.go +++ b/common/config.go @@ -213,11 +213,11 @@ type RetryPolicyConfig struct { } type CircuitBreakerPolicyConfig struct { - FailureThresholdCount int `yaml:"failureThresholdCount" json:"failureThresholdCount"` - FailureThresholdCapacity int `yaml:"failureThresholdCapacity" json:"failureThresholdCapacity"` + FailureThresholdCount uint `yaml:"failureThresholdCount" json:"failureThresholdCount"` + FailureThresholdCapacity uint `yaml:"failureThresholdCapacity" json:"failureThresholdCapacity"` HalfOpenAfter string `yaml:"halfOpenAfter" json:"halfOpenAfter"` - SuccessThresholdCount int `yaml:"successThresholdCount" json:"successThresholdCount"` - SuccessThresholdCapacity int `yaml:"successThresholdCapacity" json:"successThresholdCapacity"` + SuccessThresholdCount uint `yaml:"successThresholdCount" json:"successThresholdCount"` + SuccessThresholdCapacity uint `yaml:"successThresholdCapacity" json:"successThresholdCapacity"` } type TimeoutPolicyConfig struct { @@ -240,7 +240,7 @@ type RateLimitBudgetConfig struct { type RateLimitRuleConfig struct { Method string `yaml:"method" json:"method"` - MaxCount int `yaml:"maxCount" json:"maxCount"` + MaxCount uint `yaml:"maxCount" json:"maxCount"` Period string `yaml:"period" json:"period"` WaitTime string `yaml:"waitTime" json:"waitTime"` } @@ -367,7 +367,7 @@ func (c *Config) GetProjectConfig(projectId string) *ProjectConfig { func (c *RateLimitRuleConfig) MarshalZerologObject(e *zerolog.Event) { e.Str("method", c.Method). - Int("maxCount", c.MaxCount). + Uint("maxCount", c.MaxCount). Str("period", c.Period). Str("waitTime", c.WaitTime) } diff --git a/common/request.go b/common/request.go index bbb85a79..9060c965 100644 --- a/common/request.go +++ b/common/request.go @@ -167,10 +167,10 @@ func (r *NormalizedRequest) ApplyDirectivesFromHttp( queryArgs *fasthttp.Args, ) { drc := &RequestDirectives{ - RetryEmpty: string(headers.Peek("X-ERPC-Retry-Empty")) != "false", - RetryPending: string(headers.Peek("X-ERPC-Retry-Pending")) != "false", - SkipCacheRead: string(headers.Peek("X-ERPC-Skip-Cache-Read")) == "true", - UseUpstream: string(headers.Peek("X-ERPC-Use-Upstream")), + RetryEmpty: string(headers.Peek("X-ERPC-Retry-Empty")) != "false", + RetryPending: string(headers.Peek("X-ERPC-Retry-Pending")) != "false", + SkipCacheRead: string(headers.Peek("X-ERPC-Skip-Cache-Read")) == "true", + UseUpstream: string(headers.Peek("X-ERPC-Use-Upstream")), } if useUpstream := string(queryArgs.Peek("use-upstream")); useUpstream != "" { @@ -235,7 +235,7 @@ func (r *NormalizedRequest) JsonRpcRequest() (*JsonRpcRequest, error) { } if rpcReq.ID == nil { - rpcReq.ID = float64(rand.Intn(math.MaxInt32)) + rpcReq.ID = rand.Intn(math.MaxInt32) // #nosec G404 } r.jsonRpcRequest = rpcReq diff --git a/data/redis.go b/data/redis.go index 87864702..2eef6bce 100644 --- a/data/redis.go +++ b/data/redis.go @@ -90,7 +90,7 @@ func (r *RedisConnector) connect(ctx context.Context, cfg *common.RedisConnector func createTLSConfig(tlsCfg *common.TLSConfig) (*tls.Config, error) { config := &tls.Config{ - InsecureSkipVerify: tlsCfg.InsecureSkipVerify, + InsecureSkipVerify: tlsCfg.InsecureSkipVerify, // #nosec G402 } if tlsCfg.CertFile != "" && tlsCfg.KeyFile != "" { diff --git a/erpc/http_server.go b/erpc/http_server.go index 62de06ac..9468b90d 100644 --- a/erpc/http_server.go +++ b/erpc/http_server.go @@ -266,12 +266,23 @@ func (s *HttpServer) createRequestHandler(mainCtx context.Context, reqMaxTimeout if isBatch { fastCtx.SetStatusCode(fasthttp.StatusOK) - encoder.Encode(responses) + err = encoder.Encode(responses) + if err != nil { + fastCtx.SetStatusCode(fasthttp.StatusInternalServerError) + fastCtx.Response.Header.Set("Content-Type", "application/json") + fastCtx.SetBodyString(fmt.Sprintf(`{"jsonrpc":"2.0","error":{"code":-32603,"message":"%s"}}`, err.Error())) + return + } } else { res := responses[0] setResponseHeaders(res, fastCtx) setResponseStatusCode(res, fastCtx) - encoder.Encode(res) + err = encoder.Encode(res) + if err != nil { + fastCtx.SetStatusCode(fasthttp.StatusInternalServerError) + fastCtx.SetBodyString(fmt.Sprintf(`{"jsonrpc":"2.0","error":{"code":-32603,"message":"%s"}}`, err.Error())) + return + } } fastCtx.SetBody(buf.Bytes()) @@ -450,9 +461,16 @@ func decideErrorStatusCode(err error) int { func handleErrorResponse(logger *zerolog.Logger, nq *common.NormalizedRequest, err error, ctx *fasthttp.RequestCtx, encoder sonic.Encoder, buf *bytes.Buffer) { resp := processErrorBody(logger, nq, err) setResponseStatusCode(err, ctx) - encoder.Encode(resp) - ctx.Response.Header.Set("Content-Type", "application/json") - ctx.SetBody(buf.Bytes()) + err = encoder.Encode(resp) + if err != nil { + logger.Error().Err(err).Msgf("failed to encode error response") + ctx.SetStatusCode(fasthttp.StatusInternalServerError) + ctx.Response.Header.Set("Content-Type", "application/json") + ctx.SetBodyString(fmt.Sprintf(`{"jsonrpc":"2.0","error":{"code":-32603,"message":"%s"}}`, err.Error())) + } else { + ctx.Response.Header.Set("Content-Type", "application/json") + ctx.SetBody(buf.Bytes()) + } } func (s *HttpServer) Start(logger *zerolog.Logger) error { @@ -476,7 +494,10 @@ func (s *HttpServer) Start(logger *zerolog.Logger) error { ln6, err = net.Listen("tcp6", addrV6) if err != nil { if ln4 != nil { - ln4.Close() + err := ln4.Close() + if err != nil { + logger.Error().Err(err).Msgf("failed to close IPv4 listener") + } } return fmt.Errorf("error listening on IPv6: %w", err) } diff --git a/erpc/http_server_test.go b/erpc/http_server_test.go index af4422f7..33e0dc70 100644 --- a/erpc/http_server_test.go +++ b/erpc/http_server_test.go @@ -182,7 +182,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) { r.Status(200) r.JSON(map[string]interface{}{ "jsonrpc": "2.0", - "id": rand.Intn(100000000), + "id": rand.Intn(100_000_000), "result": map[string]interface{}{ "blockNumber": rand.Intn(100000000), }, @@ -522,7 +522,7 @@ func TestHttpServer_MultipleUpstreams(t *testing.T) { }, Upstreams: []*common.UpstreamConfig{ { - Id: "rpc1", + Id: "rpc1", Type: common.UpstreamTypeEvm, Endpoint: "http://rpc1.localhost", Evm: &common.EvmUpstreamConfig{ @@ -531,7 +531,7 @@ func TestHttpServer_MultipleUpstreams(t *testing.T) { VendorName: "llama", }, { - Id: "rpc2", + Id: "rpc2", Type: common.UpstreamTypeEvm, Endpoint: "http://rpc2.localhost", Evm: &common.EvmUpstreamConfig{ @@ -612,4 +612,4 @@ func TestHttpServer_MultipleUpstreams(t *testing.T) { assert.True(t, gock.IsDone(), "All mocks should have been called") }) -} \ No newline at end of file +} diff --git a/erpc/init.go b/erpc/init.go index b4a87785..477ca86a 100644 --- a/erpc/init.go +++ b/erpc/init.go @@ -86,8 +86,9 @@ func Init( addrV6 := fmt.Sprintf("%s:%d", cfg.Metrics.HostV6, cfg.Metrics.Port) logger.Info().Msgf("starting metrics server on port: %d addrV4: %s addrV6: %s", cfg.Metrics.Port, addrV4, addrV6) srv := &http.Server{ - Addr: fmt.Sprintf(":%d", cfg.Metrics.Port), - Handler: promhttp.Handler(), + Addr: fmt.Sprintf(":%d", cfg.Metrics.Port), + Handler: promhttp.Handler(), + ReadHeaderTimeout: 10 * time.Second, } go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { diff --git a/test/fake_server.go b/test/fake_server.go index 5336bc6c..da2a20f5 100644 --- a/test/fake_server.go +++ b/test/fake_server.go @@ -1,3 +1,4 @@ +// #nosec package test import ( diff --git a/upstream/envio_http_json_rpc_client.go b/upstream/envio_http_json_rpc_client.go index dae0c052..96d2276e 100644 --- a/upstream/envio_http_json_rpc_client.go +++ b/upstream/envio_http_json_rpc_client.go @@ -117,7 +117,7 @@ func (c *EnvioHttpJsonRpcClient) SupportsNetwork(networkId string) (bool, error) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - rid := rand.Intn(1000000) + rid := rand.Intn(100_000_000) // #nosec G404 pr := common.NewNormalizedRequest([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"eth_chainId","params":[]}`, rid))) resp, err := client.SendRequest(ctx, pr) if err != nil { diff --git a/upstream/evm_state_poller.go b/upstream/evm_state_poller.go index 05435e5a..15da1c13 100644 --- a/upstream/evm_state_poller.go +++ b/upstream/evm_state_poller.go @@ -295,7 +295,7 @@ func (e *EvmStatePoller) fetchFinalizedBlockNumber(ctx context.Context) (int64, } func (e *EvmStatePoller) fetchBlock(ctx context.Context, blockTag string) (int64, error) { - randId := rand.Intn(10_000_000) + randId := rand.Intn(100_000_000) // #nosec G404 pr := common.NewNormalizedRequest([]byte( fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"eth_getBlockByNumber","params":["%s",false]}`, randId, blockTag), )) diff --git a/upstream/failsafe.go b/upstream/failsafe.go index c6054814..a7df7c3d 100644 --- a/upstream/failsafe.go +++ b/upstream/failsafe.go @@ -92,17 +92,17 @@ func createCircuitBreakerPolicy(logger *zerolog.Logger, component string, cfg *c if cfg.FailureThresholdCount > 0 { if cfg.FailureThresholdCapacity > 0 { - builder = builder.WithFailureThresholdRatio(uint(cfg.FailureThresholdCount), uint(cfg.FailureThresholdCapacity)) + builder = builder.WithFailureThresholdRatio(cfg.FailureThresholdCount, cfg.FailureThresholdCapacity) } else { - builder = builder.WithFailureThreshold(uint(cfg.FailureThresholdCount)) + builder = builder.WithFailureThreshold(cfg.FailureThresholdCount) } } if cfg.SuccessThresholdCount > 0 { if cfg.SuccessThresholdCapacity > 0 { - builder = builder.WithSuccessThresholdRatio(uint(cfg.SuccessThresholdCount), uint(cfg.SuccessThresholdCapacity)) + builder = builder.WithSuccessThresholdRatio(cfg.SuccessThresholdCount, cfg.SuccessThresholdCapacity) } else { - builder = builder.WithSuccessThreshold(uint(cfg.SuccessThresholdCount)) + builder = builder.WithSuccessThreshold(cfg.SuccessThresholdCount) } } diff --git a/upstream/http_json_rpc_client.go b/upstream/http_json_rpc_client.go index d6792d1a..2d045926 100644 --- a/upstream/http_json_rpc_client.go +++ b/upstream/http_json_rpc_client.go @@ -259,6 +259,12 @@ func (c *GenericHttpJsonRpcClient) processBatch() { go func() { resp, err := c.httpClient.Do(httpReq) if err != nil { + if resp != nil { + er := resp.Body.Close() + if er != nil { + c.logger.Error().Err(er).Msgf("failed to close response body") + } + } batchErrChan <- err } else { batchRespChan <- resp @@ -290,8 +296,8 @@ func (c *GenericHttpJsonRpcClient) processBatch() { } func (c *GenericHttpJsonRpcClient) processBatchResponse(requests map[interface{}]*batchRequest, resp *http.Response) { - respBody, err := io.ReadAll(resp.Body) defer resp.Body.Close() + respBody, err := io.ReadAll(resp.Body) if err != nil { for _, req := range requests { req.err <- err @@ -299,7 +305,9 @@ func (c *GenericHttpJsonRpcClient) processBatchResponse(requests map[interface{} return } - c.logger.Debug().Str("body", string(respBody)).Msgf("received batch response") + if c.logger.GetLevel() == zerolog.DebugLevel { + c.logger.Debug().Str("body", string(respBody)).Msgf("received batch response") + } // Usually when upstream is dead and returns a non-JSON response body if respBody[0] == '<' { @@ -419,9 +427,8 @@ func (c *GenericHttpJsonRpcClient) sendSingleRequest(ctx context.Context, req *c } return nil, err } - - respBody, err := io.ReadAll(resp.Body) defer resp.Body.Close() + respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, err } diff --git a/upstream/pimlico_http_json_rpc_client.go b/upstream/pimlico_http_json_rpc_client.go index 60ca311f..a4e73d3d 100644 --- a/upstream/pimlico_http_json_rpc_client.go +++ b/upstream/pimlico_http_json_rpc_client.go @@ -133,7 +133,7 @@ func (c *PimlicoHttpJsonRpcClient) SupportsNetwork(networkId string) (bool, erro } ctx, cancel := context.WithTimeoutCause(context.Background(), 10*time.Second, errors.New("pimlico client timeout during eth_chainId")) defer cancel() - rid := rand.Intn(1000000) + rid := rand.Intn(100_000_000) // #nosec G404 pr := common.NewNormalizedRequest([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"eth_chainId","params":[]}`, rid))) resp, err := client.SendRequest(ctx, pr) if err != nil { diff --git a/upstream/ratelimiter_autotuner.go b/upstream/ratelimiter_autotuner.go index 51b41bf2..5a82bbc0 100644 --- a/upstream/ratelimiter_autotuner.go +++ b/upstream/ratelimiter_autotuner.go @@ -4,9 +4,12 @@ import ( "math" "sync" "time" + + "github.com/rs/zerolog" ) type RateLimitAutoTuner struct { + logger *zerolog.Logger budget *RateLimiterBudget errorCounts map[string]*ErrorCounter lastAdjustments map[string]time.Time @@ -26,6 +29,7 @@ type ErrorCounter struct { } func NewRateLimitAutoTuner( + logger *zerolog.Logger, budget *RateLimiterBudget, adjustmentPeriod time.Duration, errorRateThreshold, @@ -88,16 +92,19 @@ func (arl *RateLimitAutoTuner) adjustBudget(method string) { errorRate := float64(erc) / float64(ttc) - var newMaxCount int + var newMaxCount uint if errorRate > arl.errorRateThreshold { - newMaxCount = int(math.Ceil(float64(currentMax) * arl.decreaseFactor)) + newMaxCount = uint(math.Ceil(float64(currentMax) * arl.decreaseFactor)) } else if errorRate == 0 { - newMaxCount = int(math.Ceil(float64(currentMax) * arl.increaseFactor)) + newMaxCount = uint(math.Ceil(float64(currentMax) * arl.increaseFactor)) } else { continue } - arl.budget.AdjustBudget(rule, newMaxCount) + err := arl.budget.AdjustBudget(rule, newMaxCount) + if err != nil { + arl.logger.Warn().Err(err).Msgf("failed to adjust budget for method %s", method) + } } arl.lastAdjustments[method] = time.Now() diff --git a/upstream/ratelimiter_budget.go b/upstream/ratelimiter_budget.go index 8c801c70..25a9cea1 100644 --- a/upstream/ratelimiter_budget.go +++ b/upstream/ratelimiter_budget.go @@ -36,7 +36,7 @@ func (b *RateLimiterBudget) GetRulesByMethod(method string) []*RateLimitRule { return rules } -func (b *RateLimiterBudget) AdjustBudget(rule *RateLimitRule, newMaxCount int) error { +func (b *RateLimiterBudget) AdjustBudget(rule *RateLimitRule, newMaxCount uint) error { b.rulesMu.Lock() defer b.rulesMu.Unlock() diff --git a/upstream/ratelimiter_registry.go b/upstream/ratelimiter_registry.go index 8cd731e1..214fa37b 100644 --- a/upstream/ratelimiter_registry.go +++ b/upstream/ratelimiter_registry.go @@ -70,7 +70,7 @@ func (r *RateLimitersRegistry) createRateLimiter(rule *common.RateLimitRuleConfi return nil, common.NewErrRateLimitInvalidConfig(fmt.Errorf("failed to parse duration for limit %v: %w", rule, err)) } - builder := ratelimiter.BurstyBuilder[interface{}](uint(rule.MaxCount), duration) + builder := ratelimiter.BurstyBuilder[interface{}](rule.MaxCount, duration) if rule.WaitTime != "" { waitTime, err := time.ParseDuration(rule.WaitTime) diff --git a/upstream/registry.go b/upstream/registry.go index 69a43aeb..823ae264 100644 --- a/upstream/registry.go +++ b/upstream/registry.go @@ -284,7 +284,10 @@ func (u *UpstreamsRegistry) scheduleScoreCalculationTimers(ctx context.Context) case <-ctx.Done(): return case <-ticker.C: - u.RefreshUpstreamNetworkMethodScores() + err := u.RefreshUpstreamNetworkMethodScores() + if err != nil { + u.logger.Warn().Err(err).Msgf("failed to refresh upstream network method scores") + } } } }() diff --git a/upstream/thirdweb_http_json_rpc_client.go b/upstream/thirdweb_http_json_rpc_client.go index 1fe74887..8cad3740 100644 --- a/upstream/thirdweb_http_json_rpc_client.go +++ b/upstream/thirdweb_http_json_rpc_client.go @@ -59,7 +59,7 @@ func (c *ThirdwebHttpJsonRpcClient) SupportsNetwork(networkId string) (bool, err ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - rid := rand.Intn(1000000) + rid := rand.Intn(100_000_000) // #nosec G404 pr := common.NewNormalizedRequest([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"eth_chainId","params":[]}`, rid))) resp, err := client.SendRequest(ctx, pr) if err != nil { diff --git a/upstream/upstream.go b/upstream/upstream.go index fdef9ec8..25cc3430 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -79,7 +79,10 @@ func NewUpstream( } } - pup.guessUpstreamType() + err = pup.guessUpstreamType() + if err != nil { + return nil, err + } if client, err := cr.GetOrCreateClient(pup); err != nil { return nil, err } else { @@ -456,6 +459,7 @@ func (u *Upstream) initRateLimitAutoTuner() { return } u.rateLimiterAutoTuner = NewRateLimitAutoTuner( + &u.Logger, budget, dur, cfg.ErrorRateThreshold, @@ -621,7 +625,7 @@ func (u *Upstream) shouldSkip(req *common.NormalizedRequest) (reason error, skip return common.NewErrUpstreamSyncing(u.config.Id), true } } - + if !u.shouldHandleMethod(method) { u.Logger.Debug().Str("method", method).Msg("method not allowed or ignored by upstread") return common.NewErrUpstreamMethodIgnored(method, u.config.Id), true