Skip to content

Commit

Permalink
feat: add sync asset handlers (#68)
Browse files Browse the repository at this point in the history
* feat: add sync asset handlers

---------

Co-authored-by: Hermawan Wijaya <[email protected]>
  • Loading branch information
batrov and batrov authored Dec 11, 2023
1 parent 455b3e6 commit f07825f
Show file tree
Hide file tree
Showing 38 changed files with 2,891 additions and 1,574 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ help: ##@help show this help
NAME="github.com/goto/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "a6b2821e8ddd1127a63d3b376f860990d58931da"
PROTON_COMMIT := "eaca9798d1c1d7b3101ec1259c7e5fb949afba28"

TOOLS_MOD_DIR = ./tools
TOOLS_DIR = $(abspath ./.tools)
Expand Down
1 change: 1 addition & 0 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func runServer(ctx context.Context, cfg *Config) error {
wrkr, err := initAssetWorker(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: discoveryRepository,
AssetRepo: assetRepository,
Logger: logger,
})
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/MakeNowJust/heredoc"
"github.com/goto/compass/internal/store/elasticsearch"
"github.com/goto/compass/internal/store/postgres"
"github.com/goto/compass/internal/workermanager"
"github.com/goto/compass/pkg/telemetry"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -67,11 +68,22 @@ func runWorker(ctx context.Context, cfg *Config) error {
return err
}

pgClient, err := initPostgres(ctx, logger, cfg)
if err != nil {
return err
}

assetRepository, err := postgres.NewAssetRepository(pgClient, nil, 0, cfg.Service.Identity.ProviderDefaultName)
if err != nil {
return fmt.Errorf("create new asset repository: %w", err)
}

mgr, err := workermanager.New(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout,
strings.Split(cfg.ColSearchExclusionKeywords, ",")),
Logger: logger,
AssetRepo: assetRepository,
Logger: logger,
})
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions core/asset/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type DiscoveryRepository interface {
Search(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error)
Suggest(ctx context.Context, cfg SearchConfig) (suggestions []string, err error)
GroupAssets(ctx context.Context, cfg GroupConfig) (results []GroupResult, err error)
SyncAssets(ctx context.Context, indexName string) (cleanup func() error, err error)
}

// GroupConfig represents a group query along
Expand Down
55 changes: 55 additions & 0 deletions core/asset/mocks/discovery_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions core/asset/mocks/worker_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Service struct {
type Worker interface {
EnqueueIndexAssetJob(ctx context.Context, ast Asset) error
EnqueueDeleteAssetJob(ctx context.Context, urn string) error
EnqueueSyncAssetJob(ctx context.Context, service string) error
Close() error
}

Expand Down Expand Up @@ -229,6 +230,15 @@ func (s *Service) SuggestAssets(ctx context.Context, cfg SearchConfig) (suggesti
return s.discoveryRepository.Suggest(ctx, cfg)
}

func (s *Service) SyncAssets(ctx context.Context, services []string) error {
for _, service := range services {
if err := s.worker.EnqueueSyncAssetJob(ctx, service); err != nil {
return err
}
}
return nil
}

func (s *Service) instrumentAssetOp(ctx context.Context, op, id string, err error) {
identifier := "URN"
if isValidUUID(id) {
Expand Down
10 changes: 10 additions & 0 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type AssetService interface {
SuggestAssets(ctx context.Context, cfg asset.SearchConfig) (suggestions []string, err error)

AddProbe(ctx context.Context, assetURN string, probe *asset.Probe) error

SyncAssets(ctx context.Context, services []string) error
}

func (server *APIServer) GetAllAssets(ctx context.Context, req *compassv1beta1.GetAllAssetsRequest) (*compassv1beta1.GetAllAssetsResponse, error) {
Expand Down Expand Up @@ -344,6 +346,14 @@ func (server *APIServer) CreateAssetProbe(ctx context.Context, req *compassv1bet
}, nil
}

func (server *APIServer) SyncAssets(ctx context.Context, req *compassv1beta1.SyncAssetsRequest) (*compassv1beta1.SyncAssetsResponse, error) {
if err := server.assetService.SyncAssets(ctx, req.GetServices()); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

return &compassv1beta1.SyncAssetsResponse{}, nil
}

func (server *APIServer) upsertAsset(
ctx context.Context,
ast asset.Asset,
Expand Down
61 changes: 61 additions & 0 deletions internal/server/v1beta1/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1684,3 +1684,64 @@ func newStructpb(t *testing.T, v map[string]interface{}) *structpb.Struct {

return res
}

func TestSyncAssets(t *testing.T) {
type testCase struct {
Description string
Request *compassv1beta1.SyncAssetsRequest
ExpectStatus codes.Code
Setup func(context.Context, *mocks.AssetService)
PostCheck func(resp *compassv1beta1.SyncAssetsResponse) error
}

testCases := []testCase{
{
Description: "should return internal server error",
ExpectStatus: codes.Internal,
Request: &compassv1beta1.SyncAssetsRequest{
Services: []string{"bigquery"},
},
Setup: func(ctx context.Context, as *mocks.AssetService) {
as.EXPECT().SyncAssets(mock.Anything, []string{"bigquery"}).Return(errors.New("any error"))
},
},

{
Description: "should return success",
ExpectStatus: codes.OK,
Request: &compassv1beta1.SyncAssetsRequest{
Services: []string{"bigquery"},
},
Setup: func(ctx context.Context, as *mocks.AssetService) {
as.EXPECT().SyncAssets(mock.Anything, []string{"bigquery"}).Return(nil)
},
},
}

for _, tc := range testCases {
t.Run(tc.Description, func(t *testing.T) {
ctx := context.Background()

logger := log.NewNoop()
mockAssetSvc := mocks.NewAssetService(t)
if tc.Setup != nil {
tc.Setup(ctx, mockAssetSvc)
}

handler := NewAPIServer(APIServerDeps{AssetSvc: mockAssetSvc, Logger: logger})

got, err := handler.SyncAssets(ctx, tc.Request)
code := status.Code(err)
if code != tc.ExpectStatus {
t.Errorf("expected handler to return Code %s, returned Code %sinstead", tc.ExpectStatus.String(), code.String())
return
}
if tc.PostCheck != nil {
if err := tc.PostCheck(got); err != nil {
t.Error(err)
return
}
}
})
}
}
54 changes: 49 additions & 5 deletions internal/server/v1beta1/mocks/asset_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions internal/server/v1beta1/mocks/discussion_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f07825f

Please sign in to comment.