Skip to content

Commit

Permalink
feat: enable column level search
Browse files Browse the repository at this point in the history
  • Loading branch information
anjaliagg9791 committed Oct 5, 2023
1 parent d11283b commit 9012b79
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 36 deletions.
3 changes: 3 additions & 0 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type Config struct {

// Client
Client client.Config `mapstructure:"client"`

// Column search excluded keyword list
ColSearchExclusionKeywords string `yaml:"col_search_excluded_keywords" mapstructure:"col_search_excluded_keywords"`
}

func LoadConfig() (*Config, error) {
Expand Down
6 changes: 3 additions & 3 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package cli
import (
"context"
"fmt"
"os"

"github.com/MakeNowJust/heredoc"
"github.com/goto/compass/core/asset"
"github.com/goto/compass/core/discussion"
Expand All @@ -19,6 +17,8 @@ import (
"github.com/goto/compass/pkg/telemetry"
"github.com/goto/salt/log"
"github.com/spf13/cobra"
"os"
"strings"
)

// Version of the current build. overridden by the build system.
Expand Down Expand Up @@ -131,7 +131,7 @@ func runServer(ctx context.Context, cfg *Config) error {
if err != nil {
return fmt.Errorf("create new asset repository: %w", err)
}
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger)
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, strings.Split(cfg.ColSearchExclusionKeywords, ","))
lineageRepository, err := postgres.NewLineageRepository(pgClient)
if err != nil {
return fmt.Errorf("create new lineage repository: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions core/asset/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type SearchFlags struct {

// DisableFuzzy disables fuzziness on search
DisableFuzzy bool

IsColumnSearch bool
}

// SearchConfig represents a search query along
Expand Down
1 change: 1 addition & 0 deletions internal/server/v1beta1/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,6 @@ func getSearchFlagsFromFlags(inputFlags *compassv1beta1.SearchFlags) asset.Searc
return asset.SearchFlags{
EnableHighlight: inputFlags.GetEnableHighlight(),
DisableFuzzy: inputFlags.GetDisableFuzzy(),
IsColumnSearch: inputFlags.GetIsColumnSearch(),
}
}
12 changes: 7 additions & 5 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import (
// DiscoveryRepository implements discovery.Repository
// with elasticsearch as the backing store.
type DiscoveryRepository struct {
cli *Client
logger log.Logger
cli *Client
logger log.Logger
ColumnSearchExclusionList []string
}

func NewDiscoveryRepository(cli *Client, logger log.Logger) *DiscoveryRepository {
func NewDiscoveryRepository(cli *Client, logger log.Logger, colSearchExclusionList []string) *DiscoveryRepository {
return &DiscoveryRepository{
cli: cli,
logger: logger,
cli: cli,
logger: logger,
ColumnSearchExclusionList: colSearchExclusionList,
}
}

Expand Down
20 changes: 10 additions & 10 deletions internal/store/elasticsearch/discovery_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})
err = repo.Upsert(ctx, asset.Asset{
ID: "",
Type: asset.TypeTable,
Expand All @@ -50,7 +50,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})
err = repo.Upsert(ctx, asset.Asset{
ID: "sample-id",
Type: asset.Type("unknown-type"),
Expand All @@ -69,7 +69,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})

// upsert with create_time as a object
err = repo.Upsert(ctx, asset.Asset{
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})
err = repo.Upsert(ctx, ast)
assert.NoError(t, err)

Expand Down Expand Up @@ -178,7 +178,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})

err = repo.Upsert(ctx, existingAsset)
assert.NoError(t, err)
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})
err = repo.DeleteByID(ctx, "")
assert.ErrorIs(t, err, asset.ErrEmptyID)
})
Expand All @@ -241,7 +241,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})

err = repo.Upsert(ctx, ast)
require.NoError(t, err)
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})

t.Run("should return error if the given urn is empty", func(t *testing.T) {
err = repo.DeleteByURN(ctx, "")
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)
Expand Down
98 changes: 85 additions & 13 deletions internal/store/elasticsearch/discovery_search_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (repo *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchCon
})
}(time.Now())

query, err := buildQuery(cfg)
query, err := repo.buildQuery(cfg)
if err != nil {
return nil, asset.DiscoveryError{Op: "Search", Err: fmt.Errorf("build query: %w", err)}
}
Expand Down Expand Up @@ -195,17 +195,93 @@ func (repo *DiscoveryRepository) Suggest(ctx context.Context, config asset.Searc
return results, nil
}

func buildQuery(cfg asset.SearchConfig) (io.Reader, error) {
func (repo *DiscoveryRepository) buildColumnQuery(query *elastic.BoolQuery, cfg asset.SearchConfig, field string) *elastic.Highlight {
matchString := cfg.Text
for _, exclusionStr := range repo.ColumnSearchExclusionList {
exclusionStr = strings.TrimSpace(exclusionStr)
if strings.Contains(matchString, exclusionStr) {
matchString = strings.ReplaceAll(matchString, fmt.Sprintf("_%s", exclusionStr), "")
matchString = strings.ReplaceAll(matchString, fmt.Sprintf(" %s", exclusionStr), "")
matchString = strings.ReplaceAll(matchString, fmt.Sprintf("-%s", exclusionStr), "")
}
}

if matchString == "" {
matchString = cfg.Text
}

queries := make([]elastic.Query, 0)
termQuery := elastic.NewTermQuery(
fmt.Sprintf("%s.keyword", field),
cfg.Text,
).Boost(20)

descriptionTermQuery := elastic.NewTermQuery(
fmt.Sprintf("%s.keyword", "data.columns.description"),
cfg.Text,
)
phraseQuery := elastic.NewMultiMatchQuery(
cfg.Text,
[]string{"data.columns.name^10",

Check failure on line 225 in internal/store/elasticsearch/discovery_search_repository.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gofumpt`-ed with `-extra` (gofumpt)
"data.columns.description"}...,
).Type("phrase")

matchQuery := elastic.NewMultiMatchQuery(
matchString,
[]string{"data.columns.name^5",

Check failure on line 231 in internal/store/elasticsearch/discovery_search_repository.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gofumpt`-ed with `-extra` (gofumpt)
"data.columns.description"}...,
)

andMatchQuery := elastic.NewMultiMatchQuery(
matchString,
[]string{"data.columns.name^5",

Check failure on line 237 in internal/store/elasticsearch/discovery_search_repository.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gofumpt`-ed with `-extra` (gofumpt)
"data.columns.description"}...,
).Operator("and")

queries = append(queries, termQuery, descriptionTermQuery, phraseQuery, matchQuery, andMatchQuery)
query.Should(queries...)

if cfg.Flags.EnableHighlight {
return elastic.NewHighlight().
Order("score").
Field("data.columns.name").
Field("data.columns.description").
HighlightQuery(
elastic.NewBoolQuery().
Should(queries...),
)
}

return nil
}

func (repo *DiscoveryRepository) buildQuery(cfg asset.SearchConfig) (io.Reader, error) {
boolQuery := elastic.NewBoolQuery()
buildTextQuery(boolQuery, cfg)
var highlightQuery *elastic.Highlight
field := ""

// if the search text is empty, do a match all query and return results
if strings.TrimSpace(cfg.Text) == "" {
boolQuery.Should(elastic.NewMatchAllQuery())
highlightQuery = buildHighlightQuery(cfg)
} else {
if cfg.Flags.IsColumnSearch {
field = "data.columns.name"
highlightQuery = repo.buildColumnQuery(boolQuery, cfg, field)
} else {
field = "name"
repo.buildTextQuery(boolQuery, cfg)
highlightQuery = buildHighlightQuery(cfg)
}
}

buildFilterTermQueries(boolQuery, cfg.Filters)
buildMustMatchQueries(boolQuery, cfg)
query := buildFunctionScoreQuery(boolQuery, cfg.RankBy, cfg.Text)
highlight := buildHighlightQuery(cfg)
query := repo.buildFunctionScoreQuery(boolQuery, cfg.RankBy, cfg.Text, field)

body, err := elastic.NewSearchRequest().
Query(query).
Highlight(highlight).
Highlight(highlightQuery).
MinScore(defaultMinScore).
Body()
if err != nil {
Expand Down Expand Up @@ -237,11 +313,7 @@ func buildSuggestQuery(cfg asset.SearchConfig) (io.Reader, error) {
return payload, err
}

func buildTextQuery(q *elastic.BoolQuery, cfg asset.SearchConfig) {
if strings.TrimSpace(cfg.Text) == "" {
q.Should(elastic.NewMatchAllQuery())
}

func (repo *DiscoveryRepository) buildTextQuery(q *elastic.BoolQuery, cfg asset.SearchConfig) {

Check failure on line 316 in internal/store/elasticsearch/discovery_search_repository.go

View workflow job for this annotation

GitHub Actions / golangci

unused-receiver: method receiver 'repo' is not referenced in method's body, consider removing or renaming it as _ (revive)
boostedFields := []string{"urn^10", "name^5"}
q.Should(
// Phrase query cannot have `FUZZINESS`
Expand Down Expand Up @@ -312,12 +384,12 @@ func buildFilterExistsQueries(q *elastic.BoolQuery, fields []string) {
}
}

func buildFunctionScoreQuery(query elastic.Query, rankBy, text string) elastic.Query {
func (repo *DiscoveryRepository) buildFunctionScoreQuery(query elastic.Query, rankBy, text string, field string) elastic.Query {

Check failure on line 387 in internal/store/elasticsearch/discovery_search_repository.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gofumpt`-ed with `-extra` (gofumpt)

Check failure on line 387 in internal/store/elasticsearch/discovery_search_repository.go

View workflow job for this annotation

GitHub Actions / golangci

unused-receiver: method receiver 'repo' is not referenced in method's body, consider removing or renaming it as _ (revive)
// Added exact match term query here so that exact match gets higher priority.
fsQuery := elastic.NewFunctionScoreQuery()
if text != "" {
fsQuery.Add(
elastic.NewTermQuery("name.keyword", text),
elastic.NewTermQuery(fmt.Sprintf("%s.keyword", field), text),
elastic.NewWeightFactorFunction(2),
)
}
Expand Down
10 changes: 5 additions & 5 deletions internal/store/elasticsearch/discovery_search_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestSearcherSearch(t *testing.T) {
err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json")
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})

type expectedRow struct {
Type string
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestSearcherSuggest(t *testing.T) {
err = loadTestFixture(cli, esClient, "./testdata/suggest-test-fixture.json")
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})

t.Run("fixtures", func(t *testing.T) {
testCases := []struct {
Expand Down Expand Up @@ -424,7 +424,7 @@ func loadTestFixture(cli *elasticsearch.Client, esClient *store.Client, filePath

ctx := context.TODO()
for _, testdata := range data {
repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})
for _, ast := range testdata.Assets {
if err := repo.Upsert(ctx, ast); err != nil {
return err
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestGroupAssets(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})
_, err = repo.GroupAssets(ctx, asset.GroupConfig{
GroupBy: []string{""},
})
Expand All @@ -473,7 +473,7 @@ func TestGroupAssets(t *testing.T) {
err = loadTestFixture(cli, esClient, "./testdata/search-test-fixture.json")
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop())
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), []string{"number", "id"})

type groupTest struct {
Description string
Expand Down
6 changes: 6 additions & 0 deletions internal/store/elasticsearch/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ var indexSettingsTemplate = `{
%q: {}
},
"settings": {
"similarity": {
"my_bm25_without_length_normalization": {
"type": "BM25",
"b": "0"
}
},
"index.mapping.ignore_malformed": true,
"analysis": {
"analyzer": {
Expand Down

0 comments on commit 9012b79

Please sign in to comment.