Skip to content

Commit

Permalink
chore: update grpc, http and elastic search request timeouts to 30 se…
Browse files Browse the repository at this point in the history
…conds
  • Loading branch information
anjaliagg9791 committed Oct 5, 2023
1 parent ec9201e commit cd5f6fa
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func runServer(ctx context.Context, cfg *Config) error {
if err != nil {
return fmt.Errorf("create new asset repository: %w", err)
}
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger)
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout)
lineageRepository, err := postgres.NewLineageRepository(pgClient)
if err != nil {
return fmt.Errorf("create new lineage repository: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type Config struct {
BaseUrl string `mapstructure:"baseurl" default:"localhost:8080"`

// User Identity
Identity IdentityConfig `mapstructure:"identity"`

Identity IdentityConfig `mapstructure:"identity"`
RequestTimeout int `mapstructure:"request_timeout"`
// GRPC Config
GRPC GRPCConfig `mapstructure:"grpc"`
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func Serve(
grpc_health_v1.RegisterHealthServer(grpcServer, healthHandler)

// init http proxy
grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, time.Second*5)
grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, time.Second*time.Duration(config.RequestTimeout))
defer grpcDialCancel()

headerMatcher := makeHeaderMatcher(config)
Expand Down Expand Up @@ -160,7 +160,7 @@ func Serve(
mux.WithHTTPTarget(config.addr(), &http.Server{
Handler: gwmux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
WriteTimeout: time.Duration(config.RequestTimeout) * time.Second,
IdleTimeout: 120 * time.Second,
}),
mux.WithGRPCTarget(config.grpcAddr(), grpcServer),
Expand Down
12 changes: 7 additions & 5 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import (
// DiscoveryRepository implements discovery.Repository
// with elasticsearch as the backing store.
type DiscoveryRepository struct {
cli *Client
logger log.Logger
cli *Client
logger log.Logger
requestTimeout int
}

func NewDiscoveryRepository(cli *Client, logger log.Logger) *DiscoveryRepository {
func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout int) *DiscoveryRepository {
return &DiscoveryRepository{
cli: cli,
logger: logger,
cli: cli,
logger: logger,
requestTimeout: requestTimeout,
}
}

Expand Down
20 changes: 10 additions & 10 deletions internal/store/elasticsearch/discovery_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
err = repo.Upsert(ctx, asset.Asset{
ID: "",
Type: asset.TypeTable,
Expand All @@ -50,7 +50,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
err = repo.Upsert(ctx, asset.Asset{
ID: "sample-id",
Type: asset.Type("unknown-type"),
Expand All @@ -69,7 +69,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)

// upsert with create_time as a object
err = repo.Upsert(ctx, asset.Asset{
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
err = repo.Upsert(ctx, ast)
assert.NoError(t, err)

Expand Down Expand Up @@ -178,7 +178,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)

err = repo.Upsert(ctx, existingAsset)
assert.NoError(t, err)
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
err = repo.DeleteByID(ctx, "")
assert.ErrorIs(t, err, asset.ErrEmptyID)
})
Expand All @@ -241,7 +241,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)

err = repo.Upsert(ctx, ast)
require.NoError(t, err)
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)

t.Run("should return error if the given urn is empty", func(t *testing.T) {
err = repo.DeleteByURN(ctx, "")
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (repo *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchCon
search.WithIgnoreUnavailable(true),
search.WithSourceIncludes(returnedAssetFieldsResult...),
search.WithContext(ctx),
search.WithTimeout(time.Duration(repo.requestTimeout)*time.Second),
)
if err != nil {
return nil, asset.DiscoveryError{Op: "Search", Err: fmt.Errorf("execute search: %w", err)}
Expand Down
3 changes: 2 additions & 1 deletion internal/store/elasticsearch/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ const (
)

type Config struct {
Brokers string `mapstructure:"brokers" default:"http://localhost:9200"`
Brokers string `mapstructure:"brokers" default:"http://localhost:9200"`
RequestTimeout int `mapstructure:"request_timeout"`
}

type searchHit struct {
Expand Down

0 comments on commit cd5f6fa

Please sign in to comment.