Skip to content

Commit

Permalink
change request timeout from int to time.Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
anjaliagg9791 committed Oct 6, 2023
1 parent 8b34a2b commit 85cd9f7
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func runServer(ctx context.Context, cfg *Config) error {
if err != nil {
return fmt.Errorf("create new asset repository: %w", err)
}
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeoutInSeconds)
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout)
lineageRepository, err := postgres.NewLineageRepository(pgClient)
if err != nil {
return fmt.Errorf("create new lineage repository: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func runWorker(ctx context.Context, cfg *Config) error {

mgr, err := workermanager.New(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeoutInSeconds),
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout),
Logger: logger,
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions compass.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ newrelic:

elasticsearch:
brokers: http://localhost:9200
request_timeout_in_seconds: 10
request_timeout_in_seconds: 10s

db:
host: localhost
Expand All @@ -47,7 +47,7 @@ db:
service:
host: localhost
port: 8080
request_timeout_in_seconds: 10
request_timeout_in_seconds: 10s
identity:
headerkey_uuid: Compass-User-UUID
headerkey_email: Compass-User-Email
Expand All @@ -56,7 +56,7 @@ service:
port: 8081
max_send_msg_size: 33554432
max_recv_msg_size: 33554432
request_timeout_in_seconds: 5
request_timeout_in_seconds: 5s

worker:
enabled: true
Expand Down
16 changes: 8 additions & 8 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type Config struct {
BaseUrl string `mapstructure:"baseurl" default:"localhost:8080"`

// User Identity
Identity IdentityConfig `mapstructure:"identity"`
RequestTimeoutInSeconds int `mapstructure:"request_timeout_in_seconds" default:"10"`
Identity IdentityConfig `mapstructure:"identity"`
RequestTimeout time.Duration `mapstructure:"request_timeout" default:"10s"`
// GRPC Config
GRPC GRPCConfig `mapstructure:"grpc"`
}
Expand All @@ -55,10 +55,10 @@ type IdentityConfig struct {
}

type GRPCConfig struct {
Port int `yaml:"port" mapstructure:"port" default:"8081"`
RequestTimeoutInSeconds int `yaml:"request_timeout_in_seconds" mapstructure:"request_timeout_in_seconds" default:"5"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size" mapstructure:"max_recv_msg_size" default:"33554432"`
MaxSendMsgSize int `yaml:"max_send_msg_size" mapstructure:"max_send_msg_size" default:"33554432"`
Port int `yaml:"port" mapstructure:"port" default:"8081"`
RequestTimeout time.Duration `yaml:"request_timeout" mapstructure:"request_timeout" default:"5s"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size" mapstructure:"max_recv_msg_size" default:"33554432"`
MaxSendMsgSize int `yaml:"max_send_msg_size" mapstructure:"max_send_msg_size" default:"33554432"`
}

func Serve(
Expand Down Expand Up @@ -106,7 +106,7 @@ func Serve(
grpc_health_v1.RegisterHealthServer(grpcServer, healthHandler)

// init http proxy
grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, time.Second*time.Duration(config.GRPC.RequestTimeoutInSeconds))
grpcDialCtx, grpcDialCancel := context.WithTimeout(ctx, config.GRPC.RequestTimeout)
defer grpcDialCancel()

headerMatcher := makeHeaderMatcher(config)
Expand Down Expand Up @@ -162,7 +162,7 @@ func Serve(
mux.WithHTTPTarget(config.addr(), &http.Server{
Handler: gwmux,
ReadTimeout: 5 * time.Second,
WriteTimeout: time.Duration(config.RequestTimeoutInSeconds) * time.Second,
WriteTimeout: config.RequestTimeout,
IdleTimeout: 120 * time.Second,
}),
mux.WithGRPCTarget(config.grpcAddr(), grpcServer),
Expand Down
4 changes: 2 additions & 2 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
type DiscoveryRepository struct {
cli *Client
logger log.Logger
requestTimeout int
requestTimeout time.Duration
}

func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout int) *DiscoveryRepository {
func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout time.Duration) *DiscoveryRepository {
return &DiscoveryRepository{
cli: cli,
logger: logger,
Expand Down
20 changes: 10 additions & 10 deletions internal/store/elasticsearch/discovery_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)
err = repo.Upsert(ctx, asset.Asset{
ID: "",
Type: asset.TypeTable,
Expand All @@ -50,7 +50,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)
err = repo.Upsert(ctx, asset.Asset{
ID: "sample-id",
Type: asset.Type("unknown-type"),
Expand All @@ -69,7 +69,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)

// upsert with create_time as a object
err = repo.Upsert(ctx, asset.Asset{
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)
err = repo.Upsert(ctx, ast)
assert.NoError(t, err)

Expand Down Expand Up @@ -178,7 +178,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)

err = repo.Upsert(ctx, existingAsset)
assert.NoError(t, err)
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)
err = repo.DeleteByID(ctx, "")
assert.ErrorIs(t, err, asset.ErrEmptyID)
})
Expand All @@ -241,7 +241,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)

err = repo.Upsert(ctx, ast)
require.NoError(t, err)
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)

t.Run("should return error if the given urn is empty", func(t *testing.T) {
err = repo.DeleteByURN(ctx, "")
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (repo *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchCon
search.WithIgnoreUnavailable(true),
search.WithSourceIncludes(returnedAssetFieldsResult...),
search.WithContext(ctx),
search.WithTimeout(time.Duration(repo.requestTimeout)*time.Second),
search.WithTimeout(repo.requestTimeout),
)
if err != nil {
return nil, asset.DiscoveryError{Op: "Search", Err: fmt.Errorf("execute search: %w", err)}
Expand Down
11 changes: 6 additions & 5 deletions internal/store/elasticsearch/discovery_search_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"sort"
"testing"
"time"

"github.com/elastic/go-elasticsearch/v7"
"github.com/goto/compass/core/asset"
Expand Down Expand Up @@ -35,7 +36,7 @@ func TestSearcherSearch(t *testing.T) {
err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json")
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)

type expectedRow struct {
Type string
Expand Down Expand Up @@ -387,7 +388,7 @@ func TestSearcherSuggest(t *testing.T) {
err = loadTestFixture(cli, esClient, "./testdata/suggest-test-fixture.json")
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)

t.Run("fixtures", func(t *testing.T) {
testCases := []struct {
Expand Down Expand Up @@ -424,7 +425,7 @@ func loadTestFixture(cli *elasticsearch.Client, esClient *store.Client, filePath

ctx := context.TODO()
for _, testdata := range data {
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)
for _, ast := range testdata.Assets {
if err := repo.Upsert(ctx, ast); err != nil {
return err
Expand Down Expand Up @@ -452,7 +453,7 @@ func TestGroupAssets(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)
_, err = repo.GroupAssets(ctx, asset.GroupConfig{
GroupBy: []string{""},
})
Expand All @@ -473,7 +474,7 @@ func TestGroupAssets(t *testing.T) {
err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json")
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), 10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Duration(10)*time.Second)

type groupTest struct {
Description string
Expand Down
4 changes: 2 additions & 2 deletions internal/store/elasticsearch/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const (
)

type Config struct {
Brokers string `mapstructure:"brokers" default:"http://localhost:9200"`
RequestTimeoutInSeconds int `mapstructure:"request_timeout_in_seconds" default:"10"`
Brokers string `mapstructure:"brokers" default:"http://localhost:9200"`
RequestTimeout time.Duration `mapstructure:"request_timeout" default:"10s"`
}

type searchHit struct {
Expand Down

0 comments on commit 85cd9f7

Please sign in to comment.