Skip to content

Commit

Permalink
feat(job): only upsert updated resources into the db (#184)
Browse files Browse the repository at this point in the history
* feat: improve fetch resource job

* feat: improve fetch resource job

* feat: improve fetch resource job

* feat: improve fetch resource job

* feat: improve the logs

* fix: exclude parentID and children from diff

* fix: logs

* fix: resolve comments

* fix: resolve comments

---------

Co-authored-by: Lifosmin Simon <[email protected]>
  • Loading branch information
lifosmin and Lifosmin Simon authored Nov 18, 2024
1 parent 44b1d22 commit d71ef25
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 25 deletions.
28 changes: 28 additions & 0 deletions core/provider/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package provider

import (
"context"
"encoding/json"
"fmt"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/goto/guardian/domain"
)

Expand Down Expand Up @@ -37,6 +40,31 @@ func (m PermissionManager) GetPermissions(pc *domain.ProviderConfig, resourceTyp
return nil, ErrInvalidResourceType
}

func normalizeDetails(details map[string]interface{}) (map[string]interface{}, error) {
jsonData, err := json.Marshal(details)
if err != nil {
return nil, err
}

var normalized map[string]interface{}
if err := json.Unmarshal(jsonData, &normalized); err != nil {
return nil, err
}

return normalized, nil
}

func compareResources(existingResource, newResource domain.Resource) (bool, string) {
opts := cmp.Options{
cmpopts.IgnoreFields(domain.Resource{}, "ID", "CreatedAt", "UpdatedAt", "ParentID", "Children"),
cmpopts.EquateEmpty(),
}
existingResource.Details, _ = normalizeDetails(existingResource.Details)
newResource.Details, _ = normalizeDetails(newResource.Details)
diff := cmp.Diff(existingResource, newResource, opts)
return diff != "", diff
}

type UnimplementedClient struct{}

func (c *UnimplementedClient) CreateConfig(*domain.ProviderConfig) error {
Expand Down
65 changes: 41 additions & 24 deletions core/provider/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *Service) Create(ctx context.Context, p *domain.Provider) error {
go func() {
s.logger.Info(ctx, "provider create fetching resources", "provider_urn", p.URN)
ctx := audit.WithActor(context.Background(), domain.SystemActorName)
resources, err := s.getResources(ctx, p)
resources, _, err := s.fetchNewResources(ctx, p)
if err != nil {
s.logger.Error(ctx, "failed to fetch resources", "error", err)
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func (s *Service) Update(ctx context.Context, p *domain.Provider) error {
go func() {
s.logger.Info(ctx, "provider update fetching resources", "provider_urn", p.URN)
ctx := audit.WithActor(context.Background(), domain.SystemActorName)
resources, err := s.getResources(ctx, p)
resources, _, err := s.fetchNewResources(ctx, p)
if err != nil {
s.logger.Error(ctx, "failed to fetch resources", "error", err)
}
Expand All @@ -260,24 +260,31 @@ func (s *Service) FetchResources(ctx context.Context) error {
if err != nil {
return err
}

failedProviders := map[string]error{}
totalFetchedResourcesCount := 0
updatedResourcesCount := 0
for _, p := range providers {
startTime := time.Now()
s.logger.Info(ctx, "fetching resources", "provider_urn", p.URN)
resources, err := s.getResources(ctx, p)
resources, fetchedResourcesCount, err := s.fetchNewResources(ctx, p)
if err != nil {
s.logger.Error(ctx, "failed to get resources", "error", err)
continue
}
totalFetchedResourcesCount += fetchedResourcesCount
updatedResourcesCount += len(resources)
if len(resources) == 0 {
s.logger.Info(ctx, "no changes in this provider", "provider_urn", p.URN)
continue
}
s.logger.Info(ctx, "resources added", "provider_urn", p.URN, "count", len(flattenResources(resources)))
if err := s.resourceService.BulkUpsert(ctx, resources); err != nil {
failedProviders[p.URN] = err
s.logger.Error(ctx, "failed to add resources", "provider_urn", p.URN, "error", err)
}
s.logger.Info(ctx, "fetching resources completed", "provider_urn", p.URN, "duration", time.Since(startTime))
}

s.logger.Info(ctx, "resources", "count", totalFetchedResourcesCount, "upserted", updatedResourcesCount)
if len(failedProviders) > 0 {
var urns []string
for providerURN, err := range failedProviders {
Expand Down Expand Up @@ -562,18 +569,18 @@ func (s *Service) IsExclusiveRoleAssignment(ctx context.Context, providerType, r
return false
}

func (s *Service) getResources(ctx context.Context, p *domain.Provider) ([]*domain.Resource, error) {
func (s *Service) fetchNewResources(ctx context.Context, p *domain.Provider) ([]*domain.Resource, int, error) {
c := s.getClient(p.Type)
if c == nil {
return nil, fmt.Errorf("%w: %v", ErrInvalidProviderType, p.Type)
return nil, 0, fmt.Errorf("%w: %v", ErrInvalidProviderType, p.Type)
}

existingGuardianResources, err := s.resourceService.Find(ctx, domain.ListResourcesFilter{
ProviderType: p.Type,
ProviderURN: p.URN,
})
if err != nil {
return nil, err
return nil, 0, err
}

resourceTypeFilterMap := make(map[string]string)
Expand All @@ -585,15 +592,14 @@ func (s *Service) getResources(ctx context.Context, p *domain.Provider) ([]*doma

newProviderResources, err := c.GetResources(ctx, p.Config)
if err != nil {
return nil, fmt.Errorf("error fetching resources for %v: %w", p.ID, err)
return nil, 0, fmt.Errorf("error fetching resources for %v: %w", p.ID, err)
}

filteredResources := make([]*domain.Resource, 0)
for _, r := range newProviderResources {
if filterExpression, ok := resourceTypeFilterMap[r.Type]; ok {
v, err := evaluator.Expression(filterExpression).EvaluateWithStruct(r)
if err != nil {
return nil, err
return nil, 0, err
}
if !reflect.ValueOf(v).IsZero() {
filteredResources = append(filteredResources, r)
Expand All @@ -602,40 +608,51 @@ func (s *Service) getResources(ctx context.Context, p *domain.Provider) ([]*doma
filteredResources = append(filteredResources, r)
}
}

flattenedProviderResources := flattenResources(filteredResources)

existingProviderResources := map[string]bool{}
for _, r := range flattenedProviderResources {
for _, er := range existingGuardianResources {
if er.Type == r.Type && er.URN == r.URN {
if existingDetails := er.Details; existingDetails != nil {
if r.Details != nil {
updatedResources := []*domain.Resource{}
for _, newResource := range flattenedProviderResources {
found := false
for _, existingResource := range existingGuardianResources {
if existingResource.Type == newResource.Type && existingResource.URN == newResource.URN {
found = true
if existingDetails := existingResource.Details; existingDetails != nil {
if newResource.Details != nil {
for key, value := range existingDetails {
if _, ok := r.Details[key]; !ok {
r.Details[key] = value
if _, ok := newResource.Details[key]; !ok {
newResource.Details[key] = value
}
}
} else {
r.Details = existingDetails
newResource.Details = existingDetails
}
if isUpdated, diff := compareResources(*existingResource, *newResource); isUpdated {
s.logger.Debug(ctx, "diff", "resources", diff)
updatedResources = append(updatedResources, newResource)
s.logger.Info(ctx, "resources is updated", "resource", newResource.URN)
}
}
existingProviderResources[er.ID] = true
existingProviderResources[existingResource.ID] = true
break
}
}
if !found {
updatedResources = append(updatedResources, newResource)
s.logger.Info(ctx, "new resource added", "resource", newResource.Name)
}
}

// mark IsDeleted of guardian resources that no longer exist in provider
updatedResources := []*domain.Resource{}
for _, r := range existingGuardianResources {
if _, ok := existingProviderResources[r.ID]; !ok {
r.IsDeleted = true
updatedResources = append(updatedResources, r)
s.logger.Info(ctx, "resource deleted", "resource", r.Name)
}
}

newProviderResources = append(filteredResources, updatedResources...)
return newProviderResources, nil
return updatedResources, len(newProviderResources), nil
}

func (s *Service) validateAppealParam(a *domain.Appeal) error {
Expand Down
65 changes: 64 additions & 1 deletion core/provider/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,9 @@ func (s *ServiceTestSuite) TestFetchResources() {
s.Run("should return error if got any from resource service", func() {
s.mockProviderRepository.EXPECT().Find(mock.MatchedBy(func(ctx context.Context) bool { return true })).Return(providers, nil).Once()
for _, p := range providers {
s.mockProvider.On("GetResources", mockCtx, p.Config).Return([]*domain.Resource{}, nil).Once()
s.mockProvider.On("GetResources", mockCtx, p.Config).Return([]*domain.Resource{
{ID: "test"},
}, nil).Once()
}
expectedError := errors.New("failed to add resources for providers: [mock_provider]")
s.mockResourceService.On("BulkUpsert", mock.Anything, mock.Anything).Return(expectedError).Once()
Expand All @@ -371,6 +373,67 @@ func (s *ServiceTestSuite) TestFetchResources() {
s.EqualError(actualError, expectedError.Error())
})

s.Run("should not upsert any resources when there is no changes", func() {
existingResources := []*domain.Resource{
{
ID: "12ß",
ProviderType: mockProviderType,
ProviderURN: mockProvider,
Type: "test-resource-type",
URN: "test-resource-urn-2",
},
{
ID: "1",
ProviderType: mockProviderType,
ProviderURN: mockProvider,
Type: "test-resource-type",
URN: "test-resource-urn-1",
Details: map[string]interface{}{
"owner": "test-owner",
resource.ReservedDetailsKeyMetadata: map[string]interface{}{
"labels": map[string]string{
"foo": "bar",
"baz": "qux",
},
"x": "y",
},
},
},
}
newResources := []*domain.Resource{
{
ProviderType: mockProviderType,
ProviderURN: mockProvider,
Type: "test-resource-type",
URN: "test-resource-urn-1",
Details: map[string]interface{}{
resource.ReservedDetailsKeyMetadata: map[string]interface{}{
"labels": map[string]string{
"foo": "bar",
"baz": "qux",
},
"x": "y",
},
},
},
{
ID: "12ß",
ProviderType: mockProviderType,
ProviderURN: mockProvider,
Type: "test-resource-type",
URN: "test-resource-urn-2",
},
}

expectedProvider := providers[0]
s.mockProviderRepository.EXPECT().Find(mockCtx).Return([]*domain.Provider{expectedProvider}, nil).Once()
s.mockProvider.EXPECT().GetResources(mockCtx, expectedProvider.Config).Return(newResources, nil).Once()
s.mockResourceService.EXPECT().Find(mock.Anything, mock.Anything).Return(existingResources, nil).Once()
actualError := s.service.FetchResources(context.Background())

s.Nil(actualError)
})

s.Run("should upsert all resources on success", func() {
existingResources := []*domain.Resource{
{
Expand Down

0 comments on commit d71ef25

Please sign in to comment.