diff --git a/internal/workermanager/discovery_worker.go b/internal/workermanager/discovery_worker.go index 9019a972..8d9252b2 100644 --- a/internal/workermanager/discovery_worker.go +++ b/internal/workermanager/discovery_worker.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "github.com/goto/compass/core/asset" @@ -108,6 +109,10 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error { for _, ast := range assets { if err := m.discoveryRepo.Upsert(ctx, ast); err != nil { + if strings.Contains(err.Error(), "illegal_argument_exception") { + m.logger.Error(err.Error()) + continue + } return err } } diff --git a/internal/workermanager/in_situ_worker.go b/internal/workermanager/in_situ_worker.go index f0067d79..0bb28e33 100644 --- a/internal/workermanager/in_situ_worker.go +++ b/internal/workermanager/in_situ_worker.go @@ -3,21 +3,25 @@ package workermanager import ( "context" "fmt" + "strings" "sync" "github.com/goto/compass/core/asset" + "github.com/goto/salt/log" ) type InSituWorker struct { discoveryRepo DiscoveryRepository assetRepo asset.Repository mutex sync.Mutex + logger log.Logger } func NewInSituWorker(deps Deps) *InSituWorker { return &InSituWorker{ discoveryRepo: deps.DiscoveryRepo, assetRepo: deps.AssetRepo, + logger: deps.Logger, } } @@ -61,6 +65,10 @@ func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) for _, ast := range assets { if err := m.discoveryRepo.Upsert(ctx, ast); err != nil { + if strings.Contains(err.Error(), "illegal_argument_exception") { + m.logger.Error(err.Error()) + continue + } return err } }