Skip to content

Commit

Permalink
feat: add sync asset handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
batrov committed Dec 4, 2023
1 parent 455b3e6 commit 9c67201
Show file tree
Hide file tree
Showing 30 changed files with 2,818 additions and 1,550 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ help: ##@help show this help
NAME="github.com/goto/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "a6b2821e8ddd1127a63d3b376f860990d58931da"
PROTON_COMMIT := "eaca9798d1c1d7b3101ec1259c7e5fb949afba28"

TOOLS_MOD_DIR = ./tools
TOOLS_DIR = $(abspath ./.tools)
Expand Down
7 changes: 7 additions & 0 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,16 @@ func runServer(ctx context.Context, cfg *Config) error {
return fmt.Errorf("create new lineage repository: %w", err)
}

jobRepository, err := postgres.NewJobRepository(pgClient)
if err != nil {
return fmt.Errorf("create new job repository: %w", err)
}

wrkr, err := initAssetWorker(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: discoveryRepository,
AssetRepo: assetRepository,
JobRepo: jobRepository,
Logger: logger,
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions core/asset/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ type DiscoveryRepository interface {
Search(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error)
Suggest(ctx context.Context, cfg SearchConfig) (suggestions []string, err error)
GroupAssets(ctx context.Context, cfg GroupConfig) (results []GroupResult, err error)
Clone(ctx context.Context, indexName, clonedIndexName string) error
UpdateAlias(ctx context.Context, indexName, alias string) error
DeleteByIndexName(ctx context.Context, indexName string) error
UpdateIndexSettings(ctx context.Context, indexName string, body string) error

Check failure on line 18 in core/asset/discovery.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gofumpt`-ed with `-extra` (gofumpt)
}

// GroupConfig represents a group query along
Expand Down
175 changes: 175 additions & 0 deletions core/asset/mocks/discovery_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions core/asset/mocks/worker_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Service struct {
type Worker interface {
EnqueueIndexAssetJob(ctx context.Context, ast Asset) error
EnqueueDeleteAssetJob(ctx context.Context, urn string) error
EnqueueSyncAssetJob(ctx context.Context, services []string) error
Close() error
}

Expand Down Expand Up @@ -229,6 +230,22 @@ func (s *Service) SuggestAssets(ctx context.Context, cfg SearchConfig) (suggesti
return s.discoveryRepository.Suggest(ctx, cfg)
}

func (s *Service) SyncAssets(ctx context.Context, services []string) error {
// select from job queue

// bad request, job already there

// if not, insert

// recreate index logic

// validate

s.worker.EnqueueSyncAssetJob(ctx, services)

Check failure on line 244 in core/asset/service.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `s.worker.EnqueueSyncAssetJob` is not checked (errcheck)

return nil
}

func (s *Service) instrumentAssetOp(ctx context.Context, op, id string, err error) {
identifier := "URN"
if isValidUUID(id) {
Expand Down
22 changes: 22 additions & 0 deletions core/job/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package job

import (
"context"
"time"
)

type Repository interface {
GetSyncJobsByService(ctx context.Context, serviceName string) ([]JobsQueue, error)
}

type JobsQueue struct {
ID string `db:"id"`
Type string `db:"type"`
LastError string `db:"last_error"`
AttemptsDo int32 `db:"attempts_do"`
Payload []byte `db:"payload"`
RunAt time.Time `db:"run_at"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
LastAttempt time.Time `db:"last_attempt"`
}
18 changes: 18 additions & 0 deletions core/job/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package job

import "context"

func NewService(jobRepository Repository) *Service {
return &Service{
jobRepository: jobRepository,
}
}

type Service struct {
jobRepository Repository
}

// GetSyncJobsByService handles business process to get tags by its asset id
func (s *Service) GetSyncJobsByService(ctx context.Context, serviceName string) ([]JobsQueue, error) {
return s.jobRepository.GetSyncJobsByService(ctx, serviceName)
}
9 changes: 9 additions & 0 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type AssetService interface {
SuggestAssets(ctx context.Context, cfg asset.SearchConfig) (suggestions []string, err error)

AddProbe(ctx context.Context, assetURN string, probe *asset.Probe) error

SyncAssets(ctx context.Context, services []string) error
}

func (server *APIServer) GetAllAssets(ctx context.Context, req *compassv1beta1.GetAllAssetsRequest) (*compassv1beta1.GetAllAssetsResponse, error) {
Expand Down Expand Up @@ -344,6 +346,13 @@ func (server *APIServer) CreateAssetProbe(ctx context.Context, req *compassv1bet
}, nil
}

func (server *APIServer) SyncAssets(ctx context.Context, req *compassv1beta1.SyncAssetsRequest) (*compassv1beta1.SyncAssetsResponse, error) {

Check failure on line 350 in internal/server/v1beta1/asset.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gofumpt`-ed with `-extra` (gofumpt)
server.assetService.SyncAssets(ctx, req.GetServices())

Check failure on line 351 in internal/server/v1beta1/asset.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `server.assetService.SyncAssets` is not checked (errcheck)

return nil, nil
}

func (server *APIServer) upsertAsset(
ctx context.Context,
ast asset.Asset,
Expand Down
Loading

0 comments on commit 9c67201

Please sign in to comment.