Skip to content

Commit

Permalink
feat: add batching and cleanup mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
batrov committed Dec 6, 2023
1 parent bb08dba commit e6109d3
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 159 deletions.
2 changes: 1 addition & 1 deletion core/asset/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type DiscoveryRepository interface {
Search(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error)
Suggest(ctx context.Context, cfg SearchConfig) (suggestions []string, err error)
GroupAssets(ctx context.Context, cfg GroupConfig) (results []GroupResult, err error)
SyncAssets(ctx context.Context, indexName string, assets []Asset) error
SyncAssets(ctx context.Context, indexName string) (cleanup func() error, err error)
}

// GroupConfig represents a group query along
Expand Down
43 changes: 27 additions & 16 deletions core/asset/mocks/discovery_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 42 additions & 30 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,79 +34,91 @@ func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout time.
}
}

func (repo *DiscoveryRepository) Upsert(ctx context.Context, ast asset.Asset) error {
if ast.ID == "" {
return asset.ErrEmptyID
}
if !ast.Type.IsValid() {
return asset.ErrUnknownType
}

idxExists, err := repo.cli.indexExists(ctx, "Upsert", ast.Service)
func (repo *DiscoveryRepository) createIndex(ctx context.Context, discoveryOp, indexName, alias string) error {
idxExists, err := repo.cli.indexExists(ctx, discoveryOp, indexName)
if err != nil {
return asset.DiscoveryError{
Op: "IndexExists",
ID: ast.ID,
Index: ast.Service,
Index: indexName,
Err: err,
}
}

if !idxExists {
if err := repo.cli.CreateIdx(ctx, "Upsert", ast.Service); err != nil {
if err := repo.cli.CreateIdx(ctx, discoveryOp, indexName, alias); err != nil {
return asset.DiscoveryError{
Op: "CreateIndex",
ID: ast.ID,
Index: ast.Service,
Index: indexName,
Err: err,
}
}
}

return nil
}

func (repo *DiscoveryRepository) Upsert(ctx context.Context, ast asset.Asset) error {
if ast.ID == "" {
return asset.ErrEmptyID
}
if !ast.Type.IsValid() {
return asset.ErrUnknownType
}

if err := repo.createIndex(ctx, "Upsert", ast.Service, defaultSearchIndex); err != nil {
return err
}

return repo.indexAsset(ctx, ast)
}

func (repo *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string, asts []asset.Asset) error {
func (repo *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string) (func() error, error) {
backupIndexName := fmt.Sprintf("%+v-bak", indexName)

err := repo.updateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":true}}`)
if err != nil {
return err
return nil, err
}

err = repo.clone(ctx, indexName, backupIndexName)
if err != nil {
return err
return nil, err
}

err = repo.updateAlias(ctx, backupIndexName, "universe")
if err != nil {
return err
return nil, err
}

err = repo.deleteByIndexName(ctx, indexName)
if err != nil {
return err
return nil, err
}

for _, ast := range asts {
err = repo.Upsert(ctx, ast)
err = repo.createIndex(ctx, "SyncAssets", indexName, "")
if err != nil {
return nil, err
}

cleanup := func() error {
err = repo.updateAlias(ctx, indexName, "universe")
if err != nil {
return err
}
}

err = repo.deleteByIndexName(ctx, backupIndexName)
if err != nil {
return err
}
err = repo.deleteByIndexName(ctx, backupIndexName)
if err != nil {
return err
}

err = repo.updateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":false}}`)
if err != nil {
return err
err = repo.updateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":false}}`)
if err != nil {
return err
}
return nil
}

return nil
return cleanup, err
}

func (repo *DiscoveryRepository) DeleteByID(ctx context.Context, assetID string) error {
Expand Down
27 changes: 5 additions & 22 deletions internal/store/elasticsearch/discovery_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,27 +398,6 @@ func TestDiscoveryRepository_SyncAssets(t *testing.T) {
indexName = "bigquery-test"
)

ast1 := asset.Asset{
ID: "id1",
Type: asset.TypeTable,
Service: indexName,
URN: "urn1",
}
ast2 := asset.Asset{
ID: "id2",
Type: asset.TypeTable,
Service: indexName,
URN: "urn2",
}
ast3 := asset.Asset{
ID: "id3",
Type: asset.TypeTable,
Service: indexName,
URN: "urn3",
}

assets := []asset.Asset{ast1, ast2, ast3}

cli, err := esTestServer.NewClient()
require.NoError(t, err)

Expand All @@ -434,7 +413,11 @@ func TestDiscoveryRepository_SyncAssets(t *testing.T) {

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

err = repo.SyncAssets(ctx, indexName, assets)
_, err = repo.SyncAssets(ctx, indexName)
require.NoError(t, err)

alias := cli.Indices.GetAlias
resp, _ := alias(alias.WithIndex(indexName))
require.NotEmpty(t, resp)
})
}
16 changes: 12 additions & 4 deletions internal/store/elasticsearch/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (c *Client) Init() (string, error) {
return fmt.Sprintf("%q (server version %s)", info.ClusterName, info.Version.Number), nil
}

func (c *Client) CreateIdx(ctx context.Context, discoveryOp, indexName string) (err error) {
func (c *Client) CreateIdx(ctx context.Context, discoveryOp, indexName, alias string) (err error) {
defer func(start time.Time) {
const op = "create_index"
c.instrumentOp(ctx, instrumentParams{
Expand All @@ -181,7 +181,7 @@ func (c *Client) CreateIdx(ctx context.Context, discoveryOp, indexName string) (
})
}(time.Now())

indexSettings := buildTypeIndexSettings()
indexSettings := buildTypeIndexSettings(alias)
res, err := c.client.Indices.Create(
indexName,
c.client.Indices.Create.WithBody(strings.NewReader(indexSettings)),
Expand All @@ -207,8 +207,16 @@ func (c *Client) CreateIdx(ctx context.Context, discoveryOp, indexName string) (
return nil
}

func buildTypeIndexSettings() string {
return fmt.Sprintf(indexSettingsTemplate, serviceIndexMapping, defaultSearchIndex)
func buildTypeIndexSettings(alias string) string {
var aliasObj string

if len(alias) > 0 {
aliasObj = fmt.Sprintf(`"aliases": {
%q: {}
},`, alias)
}

return fmt.Sprintf(indexSettingsTemplate, serviceIndexMapping, aliasObj)
}

// checks for the existence of an index
Expand Down
2 changes: 1 addition & 1 deletion internal/store/elasticsearch/es_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestElasticsearch(t *testing.T) {
require.NoError(t, err)
_, err = esClient.Init()
assert.NoError(t, err)
err = esClient.CreateIdx(ctx, "", testCase.Service)
err = esClient.CreateIdx(ctx, "", testCase.Service, "universe")
if testCase.ShouldFail {
assert.Error(t, err)
return
Expand Down
4 changes: 1 addition & 3 deletions internal/store/elasticsearch/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ package elasticsearch
// and sets up the camelcase analyzer
var indexSettingsTemplate = `{
"mappings": %s,
"aliases": {
%q: {}
},
%s
"settings": {
"similarity": {
"my_bm25_without_length_normalization": {
Expand Down
56 changes: 23 additions & 33 deletions internal/workermanager/discovery_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type DiscoveryRepository interface {
Upsert(context.Context, asset.Asset) error
DeleteByURN(ctx context.Context, assetURN string) error
SyncAssets(ctx context.Context, indexName string, assets []asset.Asset) error
SyncAssets(ctx context.Context, indexName string) (cleanup func() error, err error)
}

func (m *Manager) EnqueueIndexAssetJob(ctx context.Context, ast asset.Asset) error {
Expand Down Expand Up @@ -88,47 +88,37 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error {
}
}

assets, err := m.assetRepo.GetAll(ctx, asset.Filter{
Services: []string{service},
Size: batchSize,
SortBy: "name",
})
cleanup, err := m.discoveryRepo.SyncAssets(ctx, service)
if err != nil {
return fmt.Errorf("sync asset: get assets: %w", err)
}

if err := m.discoveryRepo.SyncAssets(ctx, service, assets); err != nil {
return err
}

if len(assets) == batchSize { // do remaining upsert after first batch completed
it := 1

for {
assets, err := m.assetRepo.GetAll(ctx, asset.Filter{
Services: []string{service},
Size: batchSize,
Offset: it * batchSize,
SortBy: "name",
})
if err != nil {
return fmt.Errorf("sync asset: get assets: %w", err)
}

for _, ast := range assets {
if err := m.discoveryRepo.Upsert(ctx, ast); err != nil {
return err
}
}
it := 0

for {
assets, err := m.assetRepo.GetAll(ctx, asset.Filter{
Services: []string{service},
Size: batchSize,
Offset: it * batchSize,
SortBy: "name",
})
if err != nil {
return fmt.Errorf("sync asset: get assets: %w", err)
}

if len(assets) != batchSize {
break
for _, ast := range assets {
if err := m.discoveryRepo.Upsert(ctx, ast); err != nil {
return err
}
it++
}

if len(assets) != batchSize {
break
}
it++
}
return nil

return cleanup()
}

func (m *Manager) EnqueueDeleteAssetJob(ctx context.Context, urn string) error {
Expand Down
Loading

0 comments on commit e6109d3

Please sign in to comment.