From cd5f6fab26124df435d28dfcd4aeeafc66b0b40f Mon Sep 17 00:00:00 2001 From: "anjali.agarwal" Date: Wed, 4 Oct 2023 16:10:48 +0530 Subject: [PATCH] chore: update grpc, http and elastic search request timeouts to 30 seconds --- cli/server.go | 2 +- internal/server/server.go | 8 ++++---- .../elasticsearch/discovery_repository.go | 12 ++++++----- .../discovery_repository_test.go | 20 +++++++++---------- .../discovery_search_repository.go | 1 + internal/store/elasticsearch/es.go | 3 ++- 6 files changed, 25 insertions(+), 21 deletions(-) diff --git a/cli/server.go b/cli/server.go index c17b5b14..63890a75 100644 --- a/cli/server.go +++ b/cli/server.go @@ -131,7 +131,7 @@ func runServer(ctx context.Context, cfg *Config) error { if err != nil { return fmt.Errorf("create new asset repository: %w", err) } - discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger) + discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout) lineageRepository, err := postgres.NewLineageRepository(pgClient) if err != nil { return fmt.Errorf("create new lineage repository: %w", err) diff --git a/internal/server/server.go b/internal/server/server.go index ccdcd77b..ecbf8d21 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -37,8 +37,8 @@ type Config struct { BaseUrl string `mapstructure:"baseurl" default:"localhost:8080"` // User Identity - Identity IdentityConfig `mapstructure:"identity"` - + Identity IdentityConfig `mapstructure:"identity"` + RequestTimeout int `mapstructure:"request_timeout"` // GRPC Config GRPC GRPCConfig `mapstructure:"grpc"` } @@ -105,7 +105,7 @@ func Serve( grpc_health_v1.RegisterHealthServer(grpcServer, healthHandler) // init http proxy - grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, time.Second*5) + grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, time.Second*time.Duration(config.RequestTimeout)) defer grpcDialCancel() headerMatcher := makeHeaderMatcher(config) @@ -160,7 +160,7 @@ func Serve( mux.WithHTTPTarget(config.addr(), &http.Server{ Handler: gwmux, ReadTimeout: 5 * time.Second, - WriteTimeout: 10 * time.Second, + WriteTimeout: time.Duration(config.RequestTimeout) * time.Second, IdleTimeout: 120 * time.Second, }), mux.WithGRPCTarget(config.grpcAddr(), grpcServer), diff --git a/internal/store/elasticsearch/discovery_repository.go b/internal/store/elasticsearch/discovery_repository.go index 2c66dfc4..aa276b5b 100644 --- a/internal/store/elasticsearch/discovery_repository.go +++ b/internal/store/elasticsearch/discovery_repository.go @@ -18,14 +18,16 @@ import ( // DiscoveryRepository implements discovery.Repository // with elasticsearch as the backing store. type DiscoveryRepository struct { - cli *Client - logger log.Logger + cli *Client + logger log.Logger + requestTimeout int } -func NewDiscoveryRepository(cli *Client, logger log.Logger) *DiscoveryRepository { +func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout int) *DiscoveryRepository { return &DiscoveryRepository{ - cli: cli, - logger: logger, + cli: cli, + logger: logger, + requestTimeout: requestTimeout, } } diff --git a/internal/store/elasticsearch/discovery_repository_test.go b/internal/store/elasticsearch/discovery_repository_test.go index 43fd81e8..ceba9723 100644 --- a/internal/store/elasticsearch/discovery_repository_test.go +++ b/internal/store/elasticsearch/discovery_repository_test.go @@ -31,7 +31,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) err = repo.Upsert(ctx, asset.Asset{ ID: "", Type: asset.TypeTable, @@ -50,7 +50,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) err = repo.Upsert(ctx, asset.Asset{ ID: "sample-id", Type: asset.Type("unknown-type"), @@ -69,7 +69,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) // upsert with create_time as a object err = repo.Upsert(ctx, asset.Asset{ @@ -129,7 +129,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) err = repo.Upsert(ctx, ast) assert.NoError(t, err) @@ -178,7 +178,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) err = repo.Upsert(ctx, existingAsset) assert.NoError(t, err) @@ -219,7 +219,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) err = repo.DeleteByID(ctx, "") assert.ErrorIs(t, err, asset.ErrEmptyID) }) @@ -241,7 +241,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) err = repo.Upsert(ctx, ast) require.NoError(t, err) @@ -288,7 +288,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) err = repo.Upsert(ctx, ast1) require.NoError(t, err) @@ -319,7 +319,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) t.Run("should return error if the given urn is empty", func(t *testing.T) { err = repo.DeleteByURN(ctx, "") @@ -378,7 +378,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10) err = repo.Upsert(ctx, ast1) require.NoError(t, err) diff --git a/internal/store/elasticsearch/discovery_search_repository.go b/internal/store/elasticsearch/discovery_search_repository.go index cff8ff45..1f4f8ee1 100644 --- a/internal/store/elasticsearch/discovery_search_repository.go +++ b/internal/store/elasticsearch/discovery_search_repository.go @@ -62,6 +62,7 @@ func (repo *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchCon search.WithIgnoreUnavailable(true), search.WithSourceIncludes(returnedAssetFieldsResult...), search.WithContext(ctx), + search.WithTimeout(time.Duration(repo.requestTimeout)*time.Second), ) if err != nil { return nil, asset.DiscoveryError{Op: "Search", Err: fmt.Errorf("execute search: %w", err)} diff --git a/internal/store/elasticsearch/es.go b/internal/store/elasticsearch/es.go index 0e7797f5..c90d9120 100644 --- a/internal/store/elasticsearch/es.go +++ b/internal/store/elasticsearch/es.go @@ -27,7 +27,8 @@ const ( ) type Config struct { - Brokers string `mapstructure:"brokers" default:"http://localhost:9200"` + Brokers string `mapstructure:"brokers" default:"http://localhost:9200"` + RequestTimeout int `mapstructure:"request_timeout"` } type searchHit struct {