Skip to content

Commit

Permalink
change request timeout from int to time.Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
anjaliagg9791 committed Oct 6, 2023
1 parent 8b34a2b commit 47da738
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func runServer(ctx context.Context, cfg *Config) error {
if err != nil {
return fmt.Errorf("create new asset repository: %w", err)
}
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeoutInSeconds)
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout)
lineageRepository, err := postgres.NewLineageRepository(pgClient)
if err != nil {
return fmt.Errorf("create new lineage repository: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func runWorker(ctx context.Context, cfg *Config) error {

mgr, err := workermanager.New(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeoutInSeconds),
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout),
Logger: logger,
})
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type Config struct {
BaseUrl string `mapstructure:"baseurl" default:"localhost:8080"`

// User Identity
Identity IdentityConfig `mapstructure:"identity"`
RequestTimeoutInSeconds int `mapstructure:"request_timeout_in_seconds" default:"10"`
Identity IdentityConfig `mapstructure:"identity"`
RequestTimeout time.Duration `mapstructure:"request_timeout" default:"10s"`
// GRPC Config
GRPC GRPCConfig `mapstructure:"grpc"`
}
Expand All @@ -55,10 +55,10 @@ type IdentityConfig struct {
}

type GRPCConfig struct {
Port int `yaml:"port" mapstructure:"port" default:"8081"`
RequestTimeoutInSeconds int `yaml:"request_timeout_in_seconds" mapstructure:"request_timeout_in_seconds" default:"5"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size" mapstructure:"max_recv_msg_size" default:"33554432"`
MaxSendMsgSize int `yaml:"max_send_msg_size" mapstructure:"max_send_msg_size" default:"33554432"`
Port int `yaml:"port" mapstructure:"port" default:"8081"`
RequestTimeout time.Duration `yaml:"request_timeout" mapstructure:"request_timeout" default:"5s"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size" mapstructure:"max_recv_msg_size" default:"33554432"`
MaxSendMsgSize int `yaml:"max_send_msg_size" mapstructure:"max_send_msg_size" default:"33554432"`
}

func Serve(
Expand Down Expand Up @@ -106,7 +106,7 @@ func Serve(
grpc_health_v1.RegisterHealthServer(grpcServer, healthHandler)

// init http proxy
grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, time.Second*time.Duration(config.GRPC.RequestTimeoutInSeconds))
grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, config.GRPC.RequestTimeout)
defer grpcDialCancel()

headerMatcher := makeHeaderMatcher(config)
Expand Down Expand Up @@ -162,7 +162,7 @@ func Serve(
mux.WithHTTPTarget(config.addr(), &http.Server{
Handler: gwmux,
ReadTimeout: 5 * time.Second,
WriteTimeout: time.Duration(config.RequestTimeoutInSeconds) * time.Second,
WriteTimeout: config.RequestTimeout,
IdleTimeout: 120 * time.Second,
}),
mux.WithGRPCTarget(config.grpcAddr(), grpcServer),
Expand Down
4 changes: 2 additions & 2 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
type DiscoveryRepository struct {
cli *Client
logger log.Logger
requestTimeout int
requestTimeout time.Duration
}

func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout int) *DiscoveryRepository {
func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout time.Duration) *DiscoveryRepository {
return &DiscoveryRepository{
cli: cli,
logger: logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (repo *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchCon
search.WithIgnoreUnavailable(true),
search.WithSourceIncludes(returnedAssetFieldsResult...),
search.WithContext(ctx),
search.WithTimeout(time.Duration(repo.requestTimeout)*time.Second),
search.WithTimeout(repo.requestTimeout),
)
if err != nil {
return nil, asset.DiscoveryError{Op: "Search", Err: fmt.Errorf("execute search: %w", err)}
Expand Down
11 changes: 6 additions & 5 deletions internal/store/elasticsearch/discovery_search_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"sort"
"testing"
"time"

"github.com/elastic/go-elasticsearch/v7"
"github.com/goto/compass/core/asset"
Expand Down Expand Up @@ -35,7 +36,7 @@ func TestSearcherSearch(t *testing.T) {
err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json")
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10))

type expectedRow struct {
Type string
Expand Down Expand Up @@ -387,7 +388,7 @@ func TestSearcherSuggest(t *testing.T) {
err = loadTestFixture(cli, esClient, "./testdata/suggest-test-fixture.json")
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10))

t.Run("fixtures", func(t *testing.T) {
testCases := []struct {
Expand Down Expand Up @@ -424,7 +425,7 @@ func loadTestFixture(cli *elasticsearch.Client, esClient *store.Client, filePath

ctx := context.TODO()
for _, testdata := range data {
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10))
for _, ast := range testdata.Assets {
if err := repo.Upsert(ctx, ast); err != nil {
return err
Expand Down Expand Up @@ -452,7 +453,7 @@ func TestGroupAssets(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10))
_, err = repo.GroupAssets(ctx, asset.GroupConfig{
GroupBy: []string{""},
})
Expand All @@ -473,7 +474,7 @@ func TestGroupAssets(t *testing.T) {
err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json")
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10))

type groupTest struct {
Description string
Expand Down
4 changes: 2 additions & 2 deletions internal/store/elasticsearch/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const (
)

type Config struct {
Brokers string `mapstructure:"brokers" default:"http://localhost:9200"`
RequestTimeoutInSeconds int `mapstructure:"request_timeout_in_seconds" default:"10"`
Brokers string `mapstructure:"brokers" default:"http://localhost:9200"`
RequestTimeout time.Duration `mapstructure:"request_timeout" default:"10s"`
}

type searchHit struct {
Expand Down

0 comments on commit 47da738

Please sign in to comment.