From ef0115f73cb506fae13ba8c5978c0e2a2eac9ad7 Mon Sep 17 00:00:00 2001 From: Hermawan Wijaya Date: Tue, 5 Dec 2023 14:41:04 +0700 Subject: [PATCH] refactor: move sync related methods as private --- core/asset/discovery.go | 5 +- core/asset/mocks/discovery_repository.go | 161 ++---------------- internal/server/v1beta1/asset.go | 1 - .../elasticsearch/discovery_repository.go | 156 ++++++++++++++--- internal/store/postgres/asset_repository.go | 1 - internal/workermanager/discovery_worker.go | 41 +---- internal/workermanager/in_situ_worker.go | 36 +--- .../mocks/discovery_repository_mock.go | 161 ++---------------- pkg/worker/pgq/pgq_processor.go | 2 - 9 files changed, 163 insertions(+), 401 deletions(-) diff --git a/core/asset/discovery.go b/core/asset/discovery.go index beaef9b5..8b1fa915 100644 --- a/core/asset/discovery.go +++ b/core/asset/discovery.go @@ -12,10 +12,7 @@ type DiscoveryRepository interface { Search(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error) Suggest(ctx context.Context, cfg SearchConfig) (suggestions []string, err error) GroupAssets(ctx context.Context, cfg GroupConfig) (results []GroupResult, err error) - Clone(ctx context.Context, indexName, clonedIndexName string) error - UpdateAlias(ctx context.Context, indexName, alias string) error - DeleteByIndexName(ctx context.Context, indexName string) error - UpdateIndexSettings(ctx context.Context, indexName string, body string) error + SyncAssets(ctx context.Context, indexName string, assets []Asset) error } // GroupConfig represents a group query along diff --git a/core/asset/mocks/discovery_repository.go b/core/asset/mocks/discovery_repository.go index 6ea91981..d622e81b 100644 --- a/core/asset/mocks/discovery_repository.go +++ b/core/asset/mocks/discovery_repository.go @@ -23,50 +23,6 @@ func (_m *DiscoveryRepository) EXPECT() *DiscoveryRepository_Expecter { return &DiscoveryRepository_Expecter{mock: &_m.Mock} } -// Clone provides a mock function with given fields: ctx, indexName, clonedIndexName -func (_m *DiscoveryRepository) Clone(ctx context.Context, indexName string, clonedIndexName string) error { - ret := _m.Called(ctx, indexName, clonedIndexName) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, indexName, clonedIndexName) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DiscoveryRepository_Clone_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Clone' -type DiscoveryRepository_Clone_Call struct { - *mock.Call -} - -// Clone is a helper method to define mock.On call -// - ctx context.Context -// - indexName string -// - clonedIndexName string -func (_e *DiscoveryRepository_Expecter) Clone(ctx interface{}, indexName interface{}, clonedIndexName interface{}) *DiscoveryRepository_Clone_Call { - return &DiscoveryRepository_Clone_Call{Call: _e.mock.On("Clone", ctx, indexName, clonedIndexName)} -} - -func (_c *DiscoveryRepository_Clone_Call) Run(run func(ctx context.Context, indexName string, clonedIndexName string)) *DiscoveryRepository_Clone_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *DiscoveryRepository_Clone_Call) Return(_a0 error) *DiscoveryRepository_Clone_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DiscoveryRepository_Clone_Call) RunAndReturn(run func(context.Context, string, string) error) *DiscoveryRepository_Clone_Call { - _c.Call.Return(run) - return _c -} - // DeleteByID provides a mock function with given fields: ctx, assetID func (_m *DiscoveryRepository) DeleteByID(ctx context.Context, assetID string) error { ret := _m.Called(ctx, assetID) @@ -110,49 +66,6 @@ func (_c *DiscoveryRepository_DeleteByID_Call) RunAndReturn(run func(context.Con return _c } -// DeleteByIndexName provides a mock function with given fields: ctx, indexName -func (_m *DiscoveryRepository) DeleteByIndexName(ctx context.Context, indexName string) error { - ret := _m.Called(ctx, indexName) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, indexName) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DiscoveryRepository_DeleteByIndexName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteByIndexName' -type DiscoveryRepository_DeleteByIndexName_Call struct { - *mock.Call -} - -// DeleteByIndexName is a helper method to define mock.On call -// - ctx context.Context -// - indexName string -func (_e *DiscoveryRepository_Expecter) DeleteByIndexName(ctx interface{}, indexName interface{}) *DiscoveryRepository_DeleteByIndexName_Call { - return &DiscoveryRepository_DeleteByIndexName_Call{Call: _e.mock.On("DeleteByIndexName", ctx, indexName)} -} - -func (_c *DiscoveryRepository_DeleteByIndexName_Call) Run(run func(ctx context.Context, indexName string)) *DiscoveryRepository_DeleteByIndexName_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DiscoveryRepository_DeleteByIndexName_Call) Return(_a0 error) *DiscoveryRepository_DeleteByIndexName_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DiscoveryRepository_DeleteByIndexName_Call) RunAndReturn(run func(context.Context, string) error) *DiscoveryRepository_DeleteByIndexName_Call { - _c.Call.Return(run) - return _c -} - // DeleteByURN provides a mock function with given fields: ctx, assetURN func (_m *DiscoveryRepository) DeleteByURN(ctx context.Context, assetURN string) error { ret := _m.Called(ctx, assetURN) @@ -361,57 +274,13 @@ func (_c *DiscoveryRepository_Suggest_Call) RunAndReturn(run func(context.Contex return _c } -// UpdateAlias provides a mock function with given fields: ctx, indexName, alias -func (_m *DiscoveryRepository) UpdateAlias(ctx context.Context, indexName string, alias string) error { - ret := _m.Called(ctx, indexName, alias) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, indexName, alias) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DiscoveryRepository_UpdateAlias_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateAlias' -type DiscoveryRepository_UpdateAlias_Call struct { - *mock.Call -} - -// UpdateAlias is a helper method to define mock.On call -// - ctx context.Context -// - indexName string -// - alias string -func (_e *DiscoveryRepository_Expecter) UpdateAlias(ctx interface{}, indexName interface{}, alias interface{}) *DiscoveryRepository_UpdateAlias_Call { - return &DiscoveryRepository_UpdateAlias_Call{Call: _e.mock.On("UpdateAlias", ctx, indexName, alias)} -} - -func (_c *DiscoveryRepository_UpdateAlias_Call) Run(run func(ctx context.Context, indexName string, alias string)) *DiscoveryRepository_UpdateAlias_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *DiscoveryRepository_UpdateAlias_Call) Return(_a0 error) *DiscoveryRepository_UpdateAlias_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DiscoveryRepository_UpdateAlias_Call) RunAndReturn(run func(context.Context, string, string) error) *DiscoveryRepository_UpdateAlias_Call { - _c.Call.Return(run) - return _c -} - -// UpdateIndexSettings provides a mock function with given fields: ctx, indexName, body -func (_m *DiscoveryRepository) UpdateIndexSettings(ctx context.Context, indexName string, body string) error { - ret := _m.Called(ctx, indexName, body) +// SyncAssets provides a mock function with given fields: ctx, indexName, assets +func (_m *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string, assets []asset.Asset) error { + ret := _m.Called(ctx, indexName, assets) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, indexName, body) + if rf, ok := ret.Get(0).(func(context.Context, string, []asset.Asset) error); ok { + r0 = rf(ctx, indexName, assets) } else { r0 = ret.Error(0) } @@ -419,32 +288,32 @@ func (_m *DiscoveryRepository) UpdateIndexSettings(ctx context.Context, indexNam return r0 } -// DiscoveryRepository_UpdateIndexSettings_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateIndexSettings' -type DiscoveryRepository_UpdateIndexSettings_Call struct { +// DiscoveryRepository_SyncAssets_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncAssets' +type DiscoveryRepository_SyncAssets_Call struct { *mock.Call } -// UpdateIndexSettings is a helper method to define mock.On call +// SyncAssets is a helper method to define mock.On call // - ctx context.Context // - indexName string -// - body string -func (_e *DiscoveryRepository_Expecter) UpdateIndexSettings(ctx interface{}, indexName interface{}, body interface{}) *DiscoveryRepository_UpdateIndexSettings_Call { - return &DiscoveryRepository_UpdateIndexSettings_Call{Call: _e.mock.On("UpdateIndexSettings", ctx, indexName, body)} +// - assets []asset.Asset +func (_e *DiscoveryRepository_Expecter) SyncAssets(ctx interface{}, indexName interface{}, assets interface{}) *DiscoveryRepository_SyncAssets_Call { + return &DiscoveryRepository_SyncAssets_Call{Call: _e.mock.On("SyncAssets", ctx, indexName, assets)} } -func (_c *DiscoveryRepository_UpdateIndexSettings_Call) Run(run func(ctx context.Context, indexName string, body string)) *DiscoveryRepository_UpdateIndexSettings_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) Run(run func(ctx context.Context, indexName string, assets []asset.Asset)) *DiscoveryRepository_SyncAssets_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) + run(args[0].(context.Context), args[1].(string), args[2].([]asset.Asset)) }) return _c } -func (_c *DiscoveryRepository_UpdateIndexSettings_Call) Return(_a0 error) *DiscoveryRepository_UpdateIndexSettings_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) Return(_a0 error) *DiscoveryRepository_SyncAssets_Call { _c.Call.Return(_a0) return _c } -func (_c *DiscoveryRepository_UpdateIndexSettings_Call) RunAndReturn(run func(context.Context, string, string) error) *DiscoveryRepository_UpdateIndexSettings_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) RunAndReturn(run func(context.Context, string, []asset.Asset) error) *DiscoveryRepository_SyncAssets_Call { _c.Call.Return(run) return _c } diff --git a/internal/server/v1beta1/asset.go b/internal/server/v1beta1/asset.go index 99dd82ac..f957235e 100644 --- a/internal/server/v1beta1/asset.go +++ b/internal/server/v1beta1/asset.go @@ -347,7 +347,6 @@ func (server *APIServer) CreateAssetProbe(ctx context.Context, req *compassv1bet } func (server *APIServer) SyncAssets(ctx context.Context, req *compassv1beta1.SyncAssetsRequest) (*compassv1beta1.SyncAssetsResponse, error) { - server.assetService.SyncAssets(ctx, req.GetServices()) return nil, nil diff --git a/internal/store/elasticsearch/discovery_repository.go b/internal/store/elasticsearch/discovery_repository.go index 62c78e6e..a0025d6e 100644 --- a/internal/store/elasticsearch/discovery_repository.go +++ b/internal/store/elasticsearch/discovery_repository.go @@ -65,43 +65,47 @@ func (repo *DiscoveryRepository) Upsert(ctx context.Context, ast asset.Asset) er return repo.indexAsset(ctx, ast) } -func (repo *DiscoveryRepository) Clone(ctx context.Context, indexName string, clonedIndexName string) error { - err := repo.UpdateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":false}}`) +func (repo *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string, asts []asset.Asset) error { + backupIndexName := fmt.Sprintf("%+v-bak", indexName) + + err := repo.updateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":true}}`) if err != nil { return err } - defer func() { - err := repo.UpdateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":true}}`) - if err != nil { - - } - }() - cloneFn := repo.cli.client.Indices.Clone - _, err = cloneFn(indexName, clonedIndexName, cloneFn.WithContext(ctx)) + err = repo.clone(ctx, indexName, backupIndexName) + if err != nil { + return err + } - return err -} + err = repo.updateAlias(ctx, backupIndexName, "universe") + if err != nil { + return err + } -func (repo *DiscoveryRepository) UpdateAlias(ctx context.Context, indexName, alias string) error { - _, err := repo.cli.client.Indices.PutAlias([]string{indexName}, alias) - return err -} + err = repo.deleteByIndexName(ctx, indexName) + if err != nil { + return err + } -func (repo *DiscoveryRepository) DeleteByIndexName(ctx context.Context, indexName string) error { - deleteFn := repo.cli.client.Indices.Delete - _, err := deleteFn([]string{indexName}, deleteFn.WithContext(ctx)) - return err -} + for _, ast := range asts { + err = repo.Upsert(ctx, ast) + if err != nil { + return err + } + } -func (repo *DiscoveryRepository) UpdateIndexSettings(ctx context.Context, indexName string, body string) error { - putSettings := repo.cli.client.Indices.PutSettings + err = repo.deleteByIndexName(ctx, backupIndexName) + if err != nil { + return err + } - _, err := putSettings(strings.NewReader(body), - putSettings.WithIndex(indexName), - putSettings.WithContext(ctx)) + err = repo.updateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":false}}`) + if err != nil { + return err + } - return err + return nil } func (repo *DiscoveryRepository) DeleteByID(ctx context.Context, assetID string) error { @@ -218,3 +222,101 @@ func createUpsertBody(ast asset.Asset) (io.Reader, error) { return &buf, nil } + +func (repo *DiscoveryRepository) clone(ctx context.Context, indexName, clonedIndexName string) error { + cloneFn := repo.cli.client.Indices.Clone + resp, err := cloneFn(indexName, clonedIndexName, cloneFn.WithContext(ctx)) + if err != nil { + return asset.DiscoveryError{ + Op: "CloneDoc", + Index: indexName, + Err: err, + } + } + + if resp.IsError() { + code, reason := errorCodeAndReason(resp) + return asset.DiscoveryError{ + Op: "CloneDoc", + Index: indexName, + ESCode: code, + Err: errors.New(reason), + } + } + + return nil +} + +func (repo *DiscoveryRepository) updateAlias(ctx context.Context, indexName, alias string) error { + putAliasFn := repo.cli.client.Indices.PutAlias + resp, err := putAliasFn([]string{indexName}, alias, putAliasFn.WithContext(ctx)) + if err != nil { + return asset.DiscoveryError{ + Op: "UpdateAlias", + Index: indexName, + Err: err, + } + } + + if resp.IsError() { + code, reason := errorCodeAndReason(resp) + return asset.DiscoveryError{ + Op: "UpdateAlias", + Index: indexName, + ESCode: code, + Err: errors.New(reason), + } + } + return nil +} + +func (repo *DiscoveryRepository) deleteByIndexName(ctx context.Context, indexName string) error { + deleteFn := repo.cli.client.Indices.Delete + resp, err := deleteFn([]string{indexName}, deleteFn.WithContext(ctx)) + if err != nil { + return asset.DiscoveryError{ + Op: "DeleteIndex", + Index: indexName, + Err: err, + } + } + + if resp.IsError() { + code, reason := errorCodeAndReason(resp) + return asset.DiscoveryError{ + Op: "DeleteIndex", + Index: indexName, + ESCode: code, + Err: errors.New(reason), + } + } + + return nil +} + +func (repo *DiscoveryRepository) updateIndexSettings(ctx context.Context, indexName, body string) error { + putSettings := repo.cli.client.Indices.PutSettings + + resp, err := putSettings(strings.NewReader(body), + putSettings.WithIndex(indexName), + putSettings.WithContext(ctx)) + if err != nil { + return asset.DiscoveryError{ + Op: "UpdateSettings", + Index: indexName, + Err: err, + } + } + + if resp.IsError() { + code, reason := errorCodeAndReason(resp) + return asset.DiscoveryError{ + Op: "UpdateSettings", + Index: indexName, + ESCode: code, + Err: errors.New(reason), + } + } + + return err +} diff --git a/internal/store/postgres/asset_repository.go b/internal/store/postgres/asset_repository.go index f0c36c9d..69440246 100644 --- a/internal/store/postgres/asset_repository.go +++ b/internal/store/postgres/asset_repository.go @@ -312,7 +312,6 @@ func (r *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (string, return nil }) - if err != nil { return "", err } diff --git a/internal/workermanager/discovery_worker.go b/internal/workermanager/discovery_worker.go index c4b5c66f..d14dfea7 100644 --- a/internal/workermanager/discovery_worker.go +++ b/internal/workermanager/discovery_worker.go @@ -15,10 +15,7 @@ import ( type DiscoveryRepository interface { Upsert(context.Context, asset.Asset) error DeleteByURN(ctx context.Context, assetURN string) error - Clone(ctx context.Context, indexName, clonedIndexName string) error - UpdateAlias(ctx context.Context, indexName, alias string) error - DeleteByIndexName(ctx context.Context, indexName string) error - UpdateIndexSettings(ctx context.Context, indexName string, body string) error + SyncAssets(ctx context.Context, indexName string, assets []asset.Asset) error } func (m *Manager) EnqueueIndexAssetJob(ctx context.Context, ast asset.Asset) error { @@ -93,23 +90,6 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error { } } - backupIndexName := fmt.Sprintf("%+v-bak", service) - - err = m.discoveryRepo.Clone(ctx, service, backupIndexName) - if err != nil { - return fmt.Errorf("sync asset: clone index: %w", err) - } - - err = m.discoveryRepo.UpdateAlias(ctx, backupIndexName, "universe") - if err != nil { - return fmt.Errorf("sync asset: update alias: %w", err) - } - - err = m.discoveryRepo.DeleteByIndexName(ctx, service) - if err != nil { - return fmt.Errorf("sync asset: delete index: %w", err) - } - assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ Services: []string{service}, }) @@ -117,24 +97,7 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error { return fmt.Errorf("sync asset: get assets: %w", err) } - for _, asset := range assets { - err = m.discoveryRepo.Upsert(ctx, asset) - if err != nil { - return fmt.Errorf("sync asset: upsert assets in ES: %w", err) - } - } - - err = m.discoveryRepo.DeleteByIndexName(ctx, backupIndexName) - if err != nil { - return fmt.Errorf("sync asset: delete index: %w", err) - } - - err = m.discoveryRepo.UpdateIndexSettings(ctx, service, `{"settings":{"index.blocks.write":false}}`) - if err != nil { - return fmt.Errorf("sync asset: update index settings: %w", err) - } - - return nil + return m.discoveryRepo.SyncAssets(ctx, service, assets) } func (m *Manager) EnqueueDeleteAssetJob(ctx context.Context, urn string) error { diff --git a/internal/workermanager/in_situ_worker.go b/internal/workermanager/in_situ_worker.go index b42876f1..40a633f4 100644 --- a/internal/workermanager/in_situ_worker.go +++ b/internal/workermanager/in_situ_worker.go @@ -49,23 +49,6 @@ func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) } } - backupIndexName := fmt.Sprintf("%+v-bak", service) - - err = m.discoveryRepo.Clone(ctx, service, backupIndexName) - if err != nil { - return fmt.Errorf("sync asset: clone index: %w", err) - } - - err = m.discoveryRepo.UpdateAlias(ctx, backupIndexName, "universe") - if err != nil { - return fmt.Errorf("sync asset: update alias: %w", err) - } - - err = m.discoveryRepo.DeleteByIndexName(ctx, service) - if err != nil { - return fmt.Errorf("sync asset: delete index: %w", err) - } - assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ Services: []string{service}, }) @@ -73,24 +56,7 @@ func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) return fmt.Errorf("sync asset: get assets: %w", err) } - for _, asset := range assets { - err = m.discoveryRepo.Upsert(ctx, asset) - if err != nil { - return fmt.Errorf("sync asset: upsert assets in ES: %w", err) - } - } - - err = m.discoveryRepo.DeleteByIndexName(ctx, backupIndexName) - if err != nil { - return fmt.Errorf("sync asset: delete index: %w", err) - } - - err = m.discoveryRepo.UpdateIndexSettings(ctx, service, `{"settings":{"index.blocks.write":false}}`) - if err != nil { - return fmt.Errorf("sync asset: update index settings: %w", err) - } - - return nil + return m.discoveryRepo.SyncAssets(ctx, service, assets) } func (*InSituWorker) Close() error { return nil } diff --git a/internal/workermanager/mocks/discovery_repository_mock.go b/internal/workermanager/mocks/discovery_repository_mock.go index 0af474a1..f4cc1e93 100644 --- a/internal/workermanager/mocks/discovery_repository_mock.go +++ b/internal/workermanager/mocks/discovery_repository_mock.go @@ -23,93 +23,6 @@ func (_m *DiscoveryRepository) EXPECT() *DiscoveryRepository_Expecter { return &DiscoveryRepository_Expecter{mock: &_m.Mock} } -// Clone provides a mock function with given fields: ctx, indexName, clonedIndexName -func (_m *DiscoveryRepository) Clone(ctx context.Context, indexName string, clonedIndexName string) error { - ret := _m.Called(ctx, indexName, clonedIndexName) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, indexName, clonedIndexName) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DiscoveryRepository_Clone_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Clone' -type DiscoveryRepository_Clone_Call struct { - *mock.Call -} - -// Clone is a helper method to define mock.On call -// - ctx context.Context -// - indexName string -// - clonedIndexName string -func (_e *DiscoveryRepository_Expecter) Clone(ctx interface{}, indexName interface{}, clonedIndexName interface{}) *DiscoveryRepository_Clone_Call { - return &DiscoveryRepository_Clone_Call{Call: _e.mock.On("Clone", ctx, indexName, clonedIndexName)} -} - -func (_c *DiscoveryRepository_Clone_Call) Run(run func(ctx context.Context, indexName string, clonedIndexName string)) *DiscoveryRepository_Clone_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *DiscoveryRepository_Clone_Call) Return(_a0 error) *DiscoveryRepository_Clone_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DiscoveryRepository_Clone_Call) RunAndReturn(run func(context.Context, string, string) error) *DiscoveryRepository_Clone_Call { - _c.Call.Return(run) - return _c -} - -// DeleteByIndexName provides a mock function with given fields: ctx, indexName -func (_m *DiscoveryRepository) DeleteByIndexName(ctx context.Context, indexName string) error { - ret := _m.Called(ctx, indexName) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, indexName) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DiscoveryRepository_DeleteByIndexName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteByIndexName' -type DiscoveryRepository_DeleteByIndexName_Call struct { - *mock.Call -} - -// DeleteByIndexName is a helper method to define mock.On call -// - ctx context.Context -// - indexName string -func (_e *DiscoveryRepository_Expecter) DeleteByIndexName(ctx interface{}, indexName interface{}) *DiscoveryRepository_DeleteByIndexName_Call { - return &DiscoveryRepository_DeleteByIndexName_Call{Call: _e.mock.On("DeleteByIndexName", ctx, indexName)} -} - -func (_c *DiscoveryRepository_DeleteByIndexName_Call) Run(run func(ctx context.Context, indexName string)) *DiscoveryRepository_DeleteByIndexName_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DiscoveryRepository_DeleteByIndexName_Call) Return(_a0 error) *DiscoveryRepository_DeleteByIndexName_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DiscoveryRepository_DeleteByIndexName_Call) RunAndReturn(run func(context.Context, string) error) *DiscoveryRepository_DeleteByIndexName_Call { - _c.Call.Return(run) - return _c -} - // DeleteByURN provides a mock function with given fields: ctx, assetURN func (_m *DiscoveryRepository) DeleteByURN(ctx context.Context, assetURN string) error { ret := _m.Called(ctx, assetURN) @@ -153,57 +66,13 @@ func (_c *DiscoveryRepository_DeleteByURN_Call) RunAndReturn(run func(context.Co return _c } -// UpdateAlias provides a mock function with given fields: ctx, indexName, alias -func (_m *DiscoveryRepository) UpdateAlias(ctx context.Context, indexName string, alias string) error { - ret := _m.Called(ctx, indexName, alias) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, indexName, alias) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DiscoveryRepository_UpdateAlias_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateAlias' -type DiscoveryRepository_UpdateAlias_Call struct { - *mock.Call -} - -// UpdateAlias is a helper method to define mock.On call -// - ctx context.Context -// - indexName string -// - alias string -func (_e *DiscoveryRepository_Expecter) UpdateAlias(ctx interface{}, indexName interface{}, alias interface{}) *DiscoveryRepository_UpdateAlias_Call { - return &DiscoveryRepository_UpdateAlias_Call{Call: _e.mock.On("UpdateAlias", ctx, indexName, alias)} -} - -func (_c *DiscoveryRepository_UpdateAlias_Call) Run(run func(ctx context.Context, indexName string, alias string)) *DiscoveryRepository_UpdateAlias_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *DiscoveryRepository_UpdateAlias_Call) Return(_a0 error) *DiscoveryRepository_UpdateAlias_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DiscoveryRepository_UpdateAlias_Call) RunAndReturn(run func(context.Context, string, string) error) *DiscoveryRepository_UpdateAlias_Call { - _c.Call.Return(run) - return _c -} - -// UpdateIndexSettings provides a mock function with given fields: ctx, indexName, body -func (_m *DiscoveryRepository) UpdateIndexSettings(ctx context.Context, indexName string, body string) error { - ret := _m.Called(ctx, indexName, body) +// SyncAssets provides a mock function with given fields: ctx, indexName, assets +func (_m *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string, assets []asset.Asset) error { + ret := _m.Called(ctx, indexName, assets) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, indexName, body) + if rf, ok := ret.Get(0).(func(context.Context, string, []asset.Asset) error); ok { + r0 = rf(ctx, indexName, assets) } else { r0 = ret.Error(0) } @@ -211,32 +80,32 @@ func (_m *DiscoveryRepository) UpdateIndexSettings(ctx context.Context, indexNam return r0 } -// DiscoveryRepository_UpdateIndexSettings_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateIndexSettings' -type DiscoveryRepository_UpdateIndexSettings_Call struct { +// DiscoveryRepository_SyncAssets_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncAssets' +type DiscoveryRepository_SyncAssets_Call struct { *mock.Call } -// UpdateIndexSettings is a helper method to define mock.On call +// SyncAssets is a helper method to define mock.On call // - ctx context.Context // - indexName string -// - body string -func (_e *DiscoveryRepository_Expecter) UpdateIndexSettings(ctx interface{}, indexName interface{}, body interface{}) *DiscoveryRepository_UpdateIndexSettings_Call { - return &DiscoveryRepository_UpdateIndexSettings_Call{Call: _e.mock.On("UpdateIndexSettings", ctx, indexName, body)} +// - assets []asset.Asset +func (_e *DiscoveryRepository_Expecter) SyncAssets(ctx interface{}, indexName interface{}, assets interface{}) *DiscoveryRepository_SyncAssets_Call { + return &DiscoveryRepository_SyncAssets_Call{Call: _e.mock.On("SyncAssets", ctx, indexName, assets)} } -func (_c *DiscoveryRepository_UpdateIndexSettings_Call) Run(run func(ctx context.Context, indexName string, body string)) *DiscoveryRepository_UpdateIndexSettings_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) Run(run func(ctx context.Context, indexName string, assets []asset.Asset)) *DiscoveryRepository_SyncAssets_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) + run(args[0].(context.Context), args[1].(string), args[2].([]asset.Asset)) }) return _c } -func (_c *DiscoveryRepository_UpdateIndexSettings_Call) Return(_a0 error) *DiscoveryRepository_UpdateIndexSettings_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) Return(_a0 error) *DiscoveryRepository_SyncAssets_Call { _c.Call.Return(_a0) return _c } -func (_c *DiscoveryRepository_UpdateIndexSettings_Call) RunAndReturn(run func(context.Context, string, string) error) *DiscoveryRepository_UpdateIndexSettings_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) RunAndReturn(run func(context.Context, string, []asset.Asset) error) *DiscoveryRepository_SyncAssets_Call { _c.Call.Return(run) return _c } diff --git a/pkg/worker/pgq/pgq_processor.go b/pkg/worker/pgq/pgq_processor.go index 94176ffa..100ff835 100644 --- a/pkg/worker/pgq/pgq_processor.go +++ b/pkg/worker/pgq/pgq_processor.go @@ -108,8 +108,6 @@ func (p *Processor) Process(ctx context.Context, types []string, fn worker.JobEx return fmt.Errorf("pickup job: %w", err) } - // check for dupes, only for sync asset jobs - resultJob := fn(ctx, job) switch resultJob.Status { case worker.StatusDone: