diff --git a/cli/config.go b/cli/config.go index f61aec83..14b9b88e 100644 --- a/cli/config.go +++ b/cli/config.go @@ -103,6 +103,9 @@ type Config struct { // Client Client client.Config `mapstructure:"client"` + + // Column search excluded keyword list + ColSearchExclusionKeywords string `yaml:"col_search_excluded_keywords" mapstructure:"col_search_excluded_keywords"` } func LoadConfig() (*Config, error) { diff --git a/cli/server.go b/cli/server.go index c17b5b14..14573103 100644 --- a/cli/server.go +++ b/cli/server.go @@ -3,8 +3,6 @@ package cli import ( "context" "fmt" - "os" - "github.com/MakeNowJust/heredoc" "github.com/goto/compass/core/asset" "github.com/goto/compass/core/discussion" @@ -19,6 +17,8 @@ import ( "github.com/goto/compass/pkg/telemetry" "github.com/goto/salt/log" "github.com/spf13/cobra" + "os" + "strings" ) // Version of the current build. overridden by the build system. @@ -131,7 +131,7 @@ func runServer(ctx context.Context, cfg *Config) error { if err != nil { return fmt.Errorf("create new asset repository: %w", err) } - discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger) + discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, strings.Split(cfg.ColSearchExclusionKeywords, ",")) lineageRepository, err := postgres.NewLineageRepository(pgClient) if err != nil { return fmt.Errorf("create new lineage repository: %w", err) diff --git a/core/asset/discovery.go b/core/asset/discovery.go index ba2fd2c0..e60abf13 100644 --- a/core/asset/discovery.go +++ b/core/asset/discovery.go @@ -43,6 +43,8 @@ type SearchFlags struct { // DisableFuzzy disables fuzziness on search DisableFuzzy bool + + IsColumnSearch bool } // SearchConfig represents a search query along diff --git a/internal/server/v1beta1/search.go b/internal/server/v1beta1/search.go index 35b672b8..4717b502 100644 --- a/internal/server/v1beta1/search.go +++ b/internal/server/v1beta1/search.go @@ -140,5 +140,6 @@ func getSearchFlagsFromFlags(inputFlags *compassv1beta1.SearchFlags) asset.Searc return asset.SearchFlags{ EnableHighlight: inputFlags.GetEnableHighlight(), DisableFuzzy: inputFlags.GetDisableFuzzy(), + IsColumnSearch: inputFlags.GetIsColumnSearch(), } } diff --git a/internal/store/elasticsearch/discovery_repository.go b/internal/store/elasticsearch/discovery_repository.go index 2c66dfc4..1ab274ba 100644 --- a/internal/store/elasticsearch/discovery_repository.go +++ b/internal/store/elasticsearch/discovery_repository.go @@ -18,14 +18,16 @@ import ( // DiscoveryRepository implements discovery.Repository // with elasticsearch as the backing store. type DiscoveryRepository struct { - cli *Client - logger log.Logger + cli *Client + logger log.Logger + ColumnSearchExclusionList []string } -func NewDiscoveryRepository(cli *Client, logger log.Logger) *DiscoveryRepository { +func NewDiscoveryRepository(cli *Client, logger log.Logger, colSearchExclusionList []string) *DiscoveryRepository { return &DiscoveryRepository{ - cli: cli, - logger: logger, + cli: cli, + logger: logger, + ColumnSearchExclusionList: colSearchExclusionList, } } diff --git a/internal/store/elasticsearch/discovery_repository_test.go b/internal/store/elasticsearch/discovery_repository_test.go index 43fd81e8..e2680872 100644 --- a/internal/store/elasticsearch/discovery_repository_test.go +++ b/internal/store/elasticsearch/discovery_repository_test.go @@ -31,7 +31,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) err = repo.Upsert(ctx, asset.Asset{ ID: "", Type: asset.TypeTable, @@ -50,7 +50,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) err = repo.Upsert(ctx, asset.Asset{ ID: "sample-id", Type: asset.Type("unknown-type"), @@ -69,7 +69,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) // upsert with create_time as a object err = repo.Upsert(ctx, asset.Asset{ @@ -129,7 +129,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) err = repo.Upsert(ctx, ast) assert.NoError(t, err) @@ -178,7 +178,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) err = repo.Upsert(ctx, existingAsset) assert.NoError(t, err) @@ -219,7 +219,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) err = repo.DeleteByID(ctx, "") assert.ErrorIs(t, err, asset.ErrEmptyID) }) @@ -241,7 +241,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) err = repo.Upsert(ctx, ast) require.NoError(t, err) @@ -288,7 +288,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) err = repo.Upsert(ctx, ast1) require.NoError(t, err) @@ -319,7 +319,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) t.Run("should return error if the given urn is empty", func(t *testing.T) { err = repo.DeleteByURN(ctx, "") @@ -378,7 +378,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) err = repo.Upsert(ctx, ast1) require.NoError(t, err) diff --git a/internal/store/elasticsearch/discovery_search_repository.go b/internal/store/elasticsearch/discovery_search_repository.go index 28fc6649..092f09b9 100644 --- a/internal/store/elasticsearch/discovery_search_repository.go +++ b/internal/store/elasticsearch/discovery_search_repository.go @@ -53,7 +53,7 @@ func (repo *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchCon }) }(time.Now()) - query, err := buildQuery(cfg) + query, err := repo.buildQuery(cfg) if err != nil { return nil, asset.DiscoveryError{Op: "Search", Err: fmt.Errorf("build query: %w", err)} } @@ -195,17 +195,93 @@ func (repo *DiscoveryRepository) Suggest(ctx context.Context, config asset.Searc return results, nil } -func buildQuery(cfg asset.SearchConfig) (io.Reader, error) { +func (repo *DiscoveryRepository) buildColumnQuery(query *elastic.BoolQuery, cfg asset.SearchConfig, field string) *elastic.Highlight { + matchString := cfg.Text + for _, exclusionStr := range repo.ColumnSearchExclusionList { + exclusionStr = strings.TrimSpace(exclusionStr) + if strings.Contains(matchString, exclusionStr) { + matchString = strings.ReplaceAll(matchString, fmt.Sprintf("_%s", exclusionStr), "") + matchString = strings.ReplaceAll(matchString, fmt.Sprintf(" %s", exclusionStr), "") + matchString = strings.ReplaceAll(matchString, fmt.Sprintf("-%s", exclusionStr), "") + } + } + + if matchString == "" { + matchString = cfg.Text + } + + queries := make([]elastic.Query, 0) + termQuery := elastic.NewTermQuery( + fmt.Sprintf("%s.keyword", field), + cfg.Text, + ).Boost(20) + + descriptionTermQuery := elastic.NewTermQuery( + fmt.Sprintf("%s.keyword", "data.columns.description"), + cfg.Text, + ) + phraseQuery := elastic.NewMultiMatchQuery( + cfg.Text, + []string{"data.columns.name^10", + "data.columns.description"}..., + ).Type("phrase") + + matchQuery := elastic.NewMultiMatchQuery( + matchString, + []string{"data.columns.name^5", + "data.columns.description"}..., + ) + + andMatchQuery := elastic.NewMultiMatchQuery( + matchString, + []string{"data.columns.name^5", + "data.columns.description"}..., + ).Operator("and") + + queries = append(queries, termQuery, descriptionTermQuery, phraseQuery, matchQuery, andMatchQuery) + query.Should(queries...) + + if cfg.Flags.EnableHighlight { + return elastic.NewHighlight(). + Order("score"). + Field("data.columns.name"). + Field("data.columns.description"). + HighlightQuery( + elastic.NewBoolQuery(). + Should(queries...), + ) + } + + return nil +} + +func (repo *DiscoveryRepository) buildQuery(cfg asset.SearchConfig) (io.Reader, error) { boolQuery := elastic.NewBoolQuery() - buildTextQuery(boolQuery, cfg) + var highlightQuery *elastic.Highlight + field := "" + + // if the search text is empty, do a match all query and return results + if strings.TrimSpace(cfg.Text) == "" { + boolQuery.Should(elastic.NewMatchAllQuery()) + highlightQuery = buildHighlightQuery(cfg) + } else { + if cfg.Flags.IsColumnSearch { + field = "data.columns.name" + highlightQuery = repo.buildColumnQuery(boolQuery, cfg, field) + } else { + field = "name" + repo.buildTextQuery(boolQuery, cfg) + highlightQuery = buildHighlightQuery(cfg) + } + } + buildFilterTermQueries(boolQuery, cfg.Filters) buildMustMatchQueries(boolQuery, cfg) - query := buildFunctionScoreQuery(boolQuery, cfg.RankBy, cfg.Text) - highlight := buildHighlightQuery(cfg) + query := repo.buildFunctionScoreQuery(boolQuery, cfg.RankBy, cfg.Text, field) body, err := elastic.NewSearchRequest(). Query(query). - Highlight(highlight). + Highlight(highlightQuery). MinScore(defaultMinScore). Body() if err != nil { @@ -237,11 +313,7 @@ func buildSuggestQuery(cfg asset.SearchConfig) (io.Reader, error) { return payload, err } -func buildTextQuery(q *elastic.BoolQuery, cfg asset.SearchConfig) { - if strings.TrimSpace(cfg.Text) == "" { - q.Should(elastic.NewMatchAllQuery()) - } - +func (repo *DiscoveryRepository) buildTextQuery(q *elastic.BoolQuery, cfg asset.SearchConfig) { boostedFields := []string{"urn^10", "name^5"} q.Should( // Phrase query cannot have `FUZZINESS` @@ -312,12 +384,12 @@ func buildFilterExistsQueries(q *elastic.BoolQuery, fields []string) { } } -func buildFunctionScoreQuery(query elastic.Query, rankBy, text string) elastic.Query { +func (repo *DiscoveryRepository) buildFunctionScoreQuery(query elastic.Query, rankBy, text string, field string) elastic.Query { // Added exact match term query here so that exact match gets higher priority. fsQuery := elastic.NewFunctionScoreQuery() if text != "" { fsQuery.Add( - elastic.NewTermQuery("name.keyword", text), + elastic.NewTermQuery(fmt.Sprintf("%s.keyword", field), text), elastic.NewWeightFactorFunction(2), ) } diff --git a/internal/store/elasticsearch/discovery_search_repository_test.go b/internal/store/elasticsearch/discovery_search_repository_test.go index fd05b7cf..b69a8ef4 100644 --- a/internal/store/elasticsearch/discovery_search_repository_test.go +++ b/internal/store/elasticsearch/discovery_search_repository_test.go @@ -35,7 +35,7 @@ func TestSearcherSearch(t *testing.T) { err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json") require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) type expectedRow struct { Type string @@ -387,7 +387,7 @@ func TestSearcherSuggest(t *testing.T) { err = loadTestFixture(cli, esClient, "./testdata/suggest-test-fixture.json") require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) t.Run("fixtures", func(t *testing.T) { testCases := []struct { @@ -424,7 +424,7 @@ func loadTestFixture(cli *elasticsearch.Client, esClient *store.Client, filePath ctx := context.TODO() for _, testdata := range data { - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) for _, ast := range testdata.Assets { if err := repo.Upsert(ctx, ast); err != nil { return err @@ -452,7 +452,7 @@ func TestGroupAssets(t *testing.T) { ) require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) _, err = repo.GroupAssets(ctx, asset.GroupConfig{ GroupBy: []string{""}, }) @@ -473,7 +473,7 @@ func TestGroupAssets(t *testing.T) { err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json") require.NoError(t, err) - repo := store.NewDiscoveryRepository(esClient, log.NewNoop()) + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"}) type groupTest struct { Description string diff --git a/internal/store/elasticsearch/schema.go b/internal/store/elasticsearch/schema.go index 18b7d398..ccb3ea7f 100644 --- a/internal/store/elasticsearch/schema.go +++ b/internal/store/elasticsearch/schema.go @@ -9,6 +9,12 @@ var indexSettingsTemplate = `{ %q: {} }, "settings": { + "similarity": { + "my_bm25_without_length_normalization": { + "type": "BM25", + "b": "0" + } + }, "index.mapping.ignore_malformed": true, "analysis": { "analyzer": {