From 85cd9f72324afc6d1ca642e98418a2cc9341e6ba Mon Sep 17 00:00:00 2001 From: "anjali.agarwal" Date: Fri, 6 Oct 2023 12:21:19 +0530 Subject: [PATCH] change request timeout from int to time.Duration --- cli/server.go | 2 +- cli/worker.go | 2 +- compass.yaml.example | 6 +++--- internal/server/server.go | 16 +++++++-------- .../elasticsearch/discovery_repository.go | 4 ++-- .../discovery_repository_test.go | 20 +++++++++---------- .../discovery_search_repository.go | 2 +- .../discovery_search_repository_test.go | 11 +++++----- internal/store/elasticsearch/es.go | 4 ++-- 9 files changed, 34 insertions(+), 33 deletions(-) diff --git a/cli/server.go b/cli/server.go index ca5539ff..63890a75 100644 --- a/cli/server.go +++ b/cli/server.go @@ -131,7 +131,7 @@ func runServer(ctx context.Context, cfg *Config) error { if err != nil { return fmt.Errorf("create new asset repository: %w", err) } - discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeoutInSeconds) + discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout) lineageRepository, err := postgres.NewLineageRepository(pgClient) if err != nil { return fmt.Errorf("create new lineage repository: %w", err) diff --git a/cli/worker.go b/cli/worker.go index de49096e..c9b75aad 100644 --- a/cli/worker.go +++ b/cli/worker.go @@ -68,7 +68,7 @@ func runWorker(ctx context.Context, cfg *Config) error { mgr, err := workermanager.New(ctx, workermanager.Deps{ Config: cfg.Worker, - DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeoutInSeconds), + DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout), Logger: logger, }) if err != nil { diff --git a/compass.yaml.example b/compass.yaml.example index 09bb43cb..8922c2d6 100644 --- a/compass.yaml.example +++ b/compass.yaml.example @@ -34,7 +34,7 @@ newrelic: elasticsearch: brokers: http://localhost:9200 - request_timeout_in_seconds: 10 + request_timeout_in_seconds: 10s db: host: localhost @@ -47,7 +47,7 @@ db: service: host: localhost port: 8080 - request_timeout_in_seconds: 10 + request_timeout_in_seconds: 10s identity: headerkey_uuid: Compass-User-UUID headerkey_email: Compass-User-Email @@ -56,7 +56,7 @@ service: port: 8081 max_send_msg_size: 33554432 max_recv_msg_size: 33554432 - request_timeout_in_seconds: 5 + request_timeout_in_seconds: 5s worker: enabled: true diff --git a/internal/server/server.go b/internal/server/server.go index 8c61dff1..2c1fe77b 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -37,8 +37,8 @@ type Config struct { BaseUrl string `mapstructure:"baseurl" default:"localhost:8080"` // User Identity - Identity IdentityConfig `mapstructure:"identity"` - RequestTimeoutInSeconds int `mapstructure:"request_timeout_in_seconds" default:"10"` + Identity IdentityConfig `mapstructure:"identity"` + RequestTimeout time.Duration `mapstructure:"request_timeout" default:"10s"` // GRPC Config GRPC GRPCConfig `mapstructure:"grpc"` } @@ -55,10 +55,10 @@ type IdentityConfig struct { } type GRPCConfig struct { - Port int `yaml:"port" mapstructure:"port" default:"8081"` - RequestTimeoutInSeconds int `yaml:"request_timeout_in_seconds" mapstructure:"request_timeout_in_seconds" default:"5"` - MaxRecvMsgSize int `yaml:"max_recv_msg_size" mapstructure:"max_recv_msg_size" default:"33554432"` - MaxSendMsgSize int `yaml:"max_send_msg_size" mapstructure:"max_send_msg_size" default:"33554432"` + Port int `yaml:"port" mapstructure:"port" default:"8081"` + RequestTimeout time.Duration `yaml:"request_timeout" mapstructure:"request_timeout" default:"5s"` + MaxRecvMsgSize int `yaml:"max_recv_msg_size" mapstructure:"max_recv_msg_size" default:"33554432"` + MaxSendMsgSize int `yaml:"max_send_msg_size" mapstructure:"max_send_msg_size" default:"33554432"` } func Serve( @@ -106,7 +106,7 @@ func Serve( grpc_health_v1.RegisterHealthServer(grpcServer, healthHandler) // init http proxy - grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, time.Second*time.Duration(config.GRPC.RequestTimeoutInSeconds)) + grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, config.GRPC.RequestTimeout) defer grpcDialCancel() headerMatcher := makeHeaderMatcher(config) @@ -162,7 +162,7 @@ func Serve( mux.WithHTTPTarget(config.addr(), &http.Server{ Handler: gwmux, ReadTimeout: 5 * time.Second, - WriteTimeout: time.Duration(config.RequestTimeoutInSeconds) * time.Second, + WriteTimeout: config.RequestTimeout, IdleTimeout: 120 * time.Second, }), mux.WithGRPCTarget(config.grpcAddr(), grpcServer), diff --git a/internal/store/elasticsearch/discovery_repository.go b/internal/store/elasticsearch/discovery_repository.go index aa276b5b..c193237b 100644 --- a/internal/store/elasticsearch/discovery_repository.go +++ b/internal/store/elasticsearch/discovery_repository.go @@ -20,10 +20,10 @@ import ( type DiscoveryRepository struct { cli *Client logger log.Logger - requestTimeout int + requestTimeout time.Duration } -func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout int) *DiscoveryRepository { +func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout time.Duration) *DiscoveryRepository { return &DiscoveryRepository{ cli: cli, logger: logger, diff --git a/internal/store/elasticsearch/discovery_repository_test.go b/internal/store/elasticsearch/discovery_repository_test.go index ceba9723..5befa32a 100644 --- a/internal/store/elasticsearch/discovery_repository_test.go +++ b/internal/store/elasticsearch/discovery_repository_test.go @@ -31,7 +31,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) err = repo.Upsert(ctx, asset.Asset{ ID: "", Type: asset.TypeTable, @@ -50,7 +50,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) err = repo.Upsert(ctx, asset.Asset{ ID: "sample-id", Type: asset.Type("unknown-type"), @@ -69,7 +69,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) // upsert with create_time as a object err = repo.Upsert(ctx, asset.Asset{ @@ -129,7 +129,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) err = repo.Upsert(ctx, ast) assert.NoError(t, err) @@ -178,7 +178,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) err = repo.Upsert(ctx, existingAsset) assert.NoError(t, err) @@ -219,7 +219,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) err = repo.DeleteByID(ctx, "") assert.ErrorIs(t, err, asset.ErrEmptyID) }) @@ -241,7 +241,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) err = repo.Upsert(ctx, ast) require.NoError(t, err) @@ -288,7 +288,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) err = repo.Upsert(ctx, ast1) require.NoError(t, err) @@ -319,7 +319,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) t.Run("should return error if the given urn is empty", func(t *testing.T) { err = repo.DeleteByURN(ctx, "") @@ -378,7 +378,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) err = repo.Upsert(ctx, ast1) require.NoError(t, err) diff --git a/internal/store/elasticsearch/discovery_search_repository.go b/internal/store/elasticsearch/discovery_search_repository.go index f74a56a7..0ce7e189 100644 --- a/internal/store/elasticsearch/discovery_search_repository.go +++ b/internal/store/elasticsearch/discovery_search_repository.go @@ -68,7 +68,7 @@ func (repo *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchCon search.WithIgnoreUnavailable(true), search.WithSourceIncludes(returnedAssetFieldsResult...), search.WithContext(ctx), - search.WithTimeout(time.Duration(repo.requestTimeout)*time.Second), + search.WithTimeout(repo.requestTimeout), ) if err != nil { return nil, asset.DiscoveryError{Op: "Search", Err: fmt.Errorf("execute search: %w", err)} diff --git a/internal/store/elasticsearch/discovery_search_repository_test.go b/internal/store/elasticsearch/discovery_search_repository_test.go index 6433040a..74f2b97e 100644 --- a/internal/store/elasticsearch/discovery_search_repository_test.go +++ b/internal/store/elasticsearch/discovery_search_repository_test.go @@ -6,6 +6,7 @@ import ( "os" "sort" "testing" + "time" "github.com/elastic/go-elasticsearch/v7" "github.com/goto/compass/core/asset" @@ -35,7 +36,7 @@ func TestSearcherSearch(t *testing.T) { err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json") require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) type expectedRow struct { Type string @@ -387,7 +388,7 @@ func TestSearcherSuggest(t *testing.T) { err = loadTestFixture(cli, esClient, "./testdata/suggest-test-fixture.json") require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) t.Run("fixtures", func(t *testing.T) { testCases := []struct { @@ -424,7 +425,7 @@ func loadTestFixture(cli *elasticsearch.Client, esClient *store.Client, filePath ctx := context.TODO() for _, testdata := range data { - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) for _, ast := range testdata.Assets { if err := repo.Upsert(ctx, ast); err != nil { return err @@ -452,7 +453,7 @@ func TestGroupAssets(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) _, err = repo.GroupAssets(ctx, asset.GroupConfig{ GroupBy: []string{""}, }) @@ -473,7 +474,7 @@ func TestGroupAssets(t *testing.T) { err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json") require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second) type groupTest struct { Description string diff --git a/internal/store/elasticsearch/es.go b/internal/store/elasticsearch/es.go index 7d8f6aaa..11aa821b 100644 --- a/internal/store/elasticsearch/es.go +++ b/internal/store/elasticsearch/es.go @@ -27,8 +27,8 @@ const ( ) type Config struct { - Brokers string `mapstructure:"brokers" default:"http://localhost:9200"` - RequestTimeoutInSeconds int `mapstructure:"request_timeout_in_seconds" default:"10"` + Brokers string `mapstructure:"brokers" default:"http://localhost:9200"` + RequestTimeout time.Duration `mapstructure:"request_timeout" default:"10s"` } type searchHit struct {