diff --git a/cli/config.go b/cli/config.go index 75c66100..5407cf5c 100644 --- a/cli/config.go +++ b/cli/config.go @@ -104,11 +104,7 @@ type Config struct { // Column search excluded keyword list ColSearchExclusionKeywords string `yaml:"col_search_excluded_keywords" mapstructure:"col_search_excluded_keywords"` - Asset Asset `mapstructure:"asset"` -} - -type Asset struct { - AdditionalTypes []string `mapstructure:"additional_types"` + Asset asset.Config `mapstructure:"asset"` } func LoadConfig() (*Config, error) { diff --git a/cli/server.go b/cli/server.go index f3070021..446328a9 100644 --- a/cli/server.go +++ b/cli/server.go @@ -83,6 +83,10 @@ func serverMigrateCommand(cfg *Config) *cobra.Command { } func runServer(ctx context.Context, cfg *Config) error { + if err := cfg.Asset.Validate(); err != nil { + return err + } + logger := initLogger(cfg.LogLevel) logger.Info("compass starting", "version", Version) @@ -154,6 +158,7 @@ func runServer(ctx context.Context, cfg *Config) error { LineageRepo: lineageRepository, Worker: wrkr, Logger: logger, + Config: cfg.Asset, }) defer cancel() diff --git a/compass.yaml.example b/compass.yaml.example index 9d723980..bcd99f44 100644 --- a/compass.yaml.example +++ b/compass.yaml.example @@ -78,3 +78,4 @@ client: asset: additional_types: - fact_source + delete_assets_timeout: 5m \ No newline at end of file diff --git a/core/asset/config.go b/core/asset/config.go new file mode 100644 index 00000000..bc14d641 --- /dev/null +++ b/core/asset/config.go @@ -0,0 +1,21 @@ +package asset + +import ( + "errors" + "time" +) + +var errDeleteAssetsTimeoutIsZero = errors.New("delete assets timeout must greater than 0 second") + +type Config struct { + AdditionalTypes []string `mapstructure:"additional_types"` + DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"` +} + +func (c *Config) Validate() error { + if c.DeleteAssetsTimeout == 0 { + return errDeleteAssetsTimeoutIsZero + } + + return nil +} diff --git a/core/asset/service.go b/core/asset/service.go index 05406436..7fe5b3ff 100644 --- a/core/asset/service.go +++ b/core/asset/service.go @@ -19,6 +19,7 @@ type Service struct { lineageRepository LineageRepository worker Worker logger log.Logger + config Config cancelFnList []func() assetOpCounter metric.Int64Counter @@ -40,6 +41,7 @@ type ServiceDeps struct { LineageRepo LineageRepository Worker Worker Logger log.Logger + Config Config } func NewService(deps ServiceDeps) (service *Service, cancel func()) { @@ -55,6 +57,7 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) { lineageRepository: deps.LineageRepo, worker: deps.Worker, logger: deps.Logger, + config: deps.Config, cancelFnList: make([]func(), 0), assetOpCounter: assetOpCounter, @@ -149,8 +152,8 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest) return 0, err } - if !request.DryRun { - newCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + if !request.DryRun && total > 0 { + newCtx, cancel := context.WithTimeout(context.Background(), s.config.DeleteAssetsTimeout) s.cancelFnList = append(s.cancelFnList, cancel) go s.executeDeleteAssets(newCtx, deleteSQLExpr) } @@ -161,16 +164,16 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest) func (s *Service) executeDeleteAssets(ctx context.Context, deleteSQLExpr queryexpr.ExprStr) { deletedURNs, err := s.assetRepository.DeleteByQueryExpr(ctx, deleteSQLExpr) if err != nil { - s.logger.Error("Asset deletion failed, skipping Elasticsearch and Lineage deletions. Err:", err) + s.logger.Error("asset deletion failed, skipping elasticsearch and lineage deletions", "err:", err) return } if err := s.lineageRepository.DeleteByURNs(ctx, deletedURNs); err != nil { - s.logger.Error("Error occurred during Lineage deletion:", err) + s.logger.Error("error occurred during lineage deletion", "err:", err) } if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(ctx, deleteSQLExpr.String()); err != nil { - s.logger.Error("Error occurred during Elasticsearch deletion:", err) + s.logger.Error("error occurred during elasticsearch deletion", "err:", err) } }