Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

group endpoints for duplicate instances #46

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion pkg/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package common

import "fmt"
import (
"fmt"
"net/url"
"strconv"
)

const (
AttrAPIID = "API ID"
Expand All @@ -14,3 +18,43 @@ const (
func FormatAPICacheKey(apiID, stageName string) string {
return fmt.Sprintf("%s-%s", apiID, stageName)
}

// FormatEndpointKey key to use to look up endpoints for a service
func FormatEndpointKey(assetID, productVersion, assetVersion string) string {
return fmt.Sprintf("%s-%s-%s", assetID, productVersion, assetVersion)
}

// ParseEndpoint parses an endpoint
func ParseEndpoint(endpointURL string) (host, basePath, scheme string, port int32, err error) {
endpoint, err := url.Parse(endpointURL)
scheme = endpoint.Scheme
if err != nil {
return "", "", "", 0, err
}

basePath = ""
if endpoint.Path == "" {
basePath = "/"
} else {
basePath = endpoint.Path
}

var portInt int
if scheme == "http" {
portInt = 80
} else {
portInt = 443
}

strPort := endpoint.Port()
if strPort != "" {
portInt, err = strconv.Atoi(strPort)
if err != nil {
return "", "", "", 0, err
}
}

port = int32(portInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this should be
i, err := strconv.ParseInt(portInt, 10, 32)
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that might be better


return endpoint.Hostname(), basePath, scheme, port, nil
}
1 change: 1 addition & 0 deletions pkg/discovery/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func BuildServiceBody(service *ServiceDetail) (apic.ServiceBody, error) {
SetImageContentType(service.ImageContentType).
SetResourceType(service.ResourceType).
SetServiceAttribute(service.ServiceAttributes).
SetServiceEndpoints(service.Endpoints).
SetStage(service.Stage).
SetState(service.State).
SetStatus(service.Status).
Expand Down
74 changes: 70 additions & 4 deletions pkg/discovery/servicehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type serviceHandler struct {
cache cache.Cache
}

type endpointsMap map[string][]string

func (s *serviceHandler) OnConfigChange(cfg *config.MulesoftConfig) {
s.discoveryTags = cleanTags(cfg.DiscoveryTags)
s.discoveryIgnoreTags = cleanTags(cfg.DiscoveryIgnoreTags)
Expand All @@ -56,6 +58,8 @@ func (s *serviceHandler) OnConfigChange(cfg *config.MulesoftConfig) {
// can resolve to multiple ServiceDetails.
func (s *serviceHandler) ToServiceDetails(asset *anypoint.Asset) []*ServiceDetail {
serviceDetails := []*ServiceDetail{}
// key - the api service revision, formatted as 'assetID-productVersion-assetVersion'. value - the endpoints to set for that revision
endpoints := endpointsMap{}
logger := logrus.WithFields(logrus.Fields{
"assetName": asset.AssetID,
"assetID": asset.ID,
Expand All @@ -70,7 +74,7 @@ func (s *serviceHandler) ToServiceDetails(asset *anypoint.Asset) []*ServiceDetai
continue
}

serviceDetail, err := s.getServiceDetail(asset, &api)
serviceDetail, err := s.getServiceDetail(asset, &api, endpoints)
if err != nil {
logger.Errorf("error getting the service details: %s", err.Error())
continue
Expand All @@ -79,17 +83,22 @@ func (s *serviceHandler) ToServiceDetails(asset *anypoint.Asset) []*ServiceDetai
serviceDetails = append(serviceDetails, serviceDetail)
}
}
for _, service := range serviceDetails {
addServiceEndpoint(service, endpoints, logger)
}
return serviceDetails
}

// getServiceDetail gets the ServiceDetail for the API asset.
func (s *serviceHandler) getServiceDetail(asset *anypoint.Asset, api *anypoint.API) (*ServiceDetail, error) {
func (s *serviceHandler) getServiceDetail(asset *anypoint.Asset, api *anypoint.API, endpoints endpointsMap) (*ServiceDetail, error) {
// set the count to 0 for each one so it is not included when hashing the api.
api.ActiveContractsCount = 0
logger := logrus.WithFields(logrus.Fields{
"assetName": asset.AssetID,
"assetID": asset.ID,
"apiID": api.ID,
"apiAssetVersion": api.AssetVersion,
"productVersion": api.ProductVersion,
})

// Get the policies associated with the API
Expand All @@ -116,6 +125,13 @@ func (s *serviceHandler) getServiceDetail(asset *anypoint.Asset, api *anypoint.A
logger.Error(err)
}

endpointKey := common.FormatEndpointKey(api.AssetID, api.ProductVersion, api.AssetVersion)
isInstance := saveEndpoint(endpoints, endpointKey, api.EndpointURI)
if isInstance {
logger.Debug("discovered as a new endpoint for an existing instance.")
return nil, nil
}

apiID := strconv.FormatInt(api.ID, 10)

subSchName := s.subscriptionManager.GetSubscriptionSchemaName(config.PolicyDetail{
Expand Down Expand Up @@ -184,6 +200,7 @@ func (s *serviceHandler) getServiceDetail(asset *anypoint.Asset, api *anypoint.A
APISpec: modifiedSpec,
AuthPolicy: authPolicy,
Description: api.Description,
Endpoints: make([]apic.EndpointDefinition, 0),
ID: fmt.Sprint(asset.ID),
Image: icon,
ImageContentType: iconContentType,
Expand All @@ -195,10 +212,10 @@ func (s *serviceHandler) getServiceDetail(asset *anypoint.Asset, api *anypoint.A
common.AttrChecksum: checksum,
common.AttrProductVersion: api.ProductVersion,
},
Stage: api.ProductVersion,
Stage: api.AssetVersion,
Tags: api.Tags,
Title: asset.ExchangeAssetName,
Version: api.ProductVersion,
Version: api.AssetVersion,
SubscriptionName: subSchName,
Status: status,
}, nil
Expand Down Expand Up @@ -534,3 +551,52 @@ func getMapFromInterface(item interface{}) map[string]interface{} {
}
return conf
}

// saveEndpoint saves an endpoint to the map. Returns true if the endpoint key is already in the map
// which means the current API should not be processed as its own service or revision.
// Instead, it should be added as an endpoint to an existing instance.
func saveEndpoint(endpoints endpointsMap, endpointKey, endpointURI string) (isInstance bool) {
epList, ok := endpoints[endpointKey]
if ok {
isFound := false
for _, v := range epList {
if v == endpointURI {
isFound = true
break
}
}
if !isFound {
epList = append(epList, endpointURI)
endpoints[endpointKey] = epList
return true
}
} else {
endpoints[endpointKey] = []string{endpointURI}
}
return false
}

func addServiceEndpoint(service *ServiceDetail, endpoints endpointsMap, logger *logrus.Entry) {
attrs := service.ServiceAttributes
assetVersion := attrs[common.AttrAssetVersion]
productVersion := attrs[common.AttrProductVersion]
assetName := service.APIName
key := common.FormatEndpointKey(assetName, productVersion, assetVersion)

val, ok := endpoints[key]
if ok {
for _, ep := range val {
host, basePath, scheme, port, err := common.ParseEndpoint(ep)
if err != nil {
logger.Error(err)
}
def := apic.EndpointDefinition{
Host: host,
Port: port,
Protocol: scheme,
BasePath: basePath,
}
service.Endpoints = append(service.Endpoints, def)
}
}
}
10 changes: 6 additions & 4 deletions pkg/discovery/servicehandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func TestServiceHandler(t *testing.T) {
assert.Equal(t, apic.Apikey, item.AuthPolicy)
assert.Equal(t, fmt.Sprint(asset.ID), item.ID)
assert.Equal(t, apic.Oas3, item.ResourceType)
assert.Equal(t, api.ProductVersion, item.Stage)
assert.Equal(t, api.AssetVersion, item.Stage)
assert.Equal(t, asset.ExchangeAssetName, item.Title)
assert.Equal(t, api.ProductVersion, item.Version)
assert.Equal(t, api.AssetVersion, item.Version)
assert.Equal(t, api.Tags, item.Tags)
assert.NotEmpty(t, item.ServiceAttributes[common.AttrChecksum])
assert.Equal(t, fmt.Sprint(api.ID), item.ServiceAttributes[common.AttrAPIID])
Expand Down Expand Up @@ -191,7 +191,8 @@ func TestServiceHandlerGetPolicyError(t *testing.T) {
cache: cache.New(),
subscriptionManager: &mockSchemaHandler{},
}
sd, err := sh.getServiceDetail(&asset, &asset.APIs[0])
endpoints := endpointsMap{}
sd, err := sh.getServiceDetail(&asset, &asset.APIs[0], endpoints)

assert.Nil(t, sd)
assert.Equal(t, expectedErr, err)
Expand All @@ -212,7 +213,8 @@ func TestServiceHandlerGetExchangeAssetError(t *testing.T) {
subscriptionManager: &mockSchemaHandler{},
cache: cache.New(),
}
sd, err := sh.getServiceDetail(&asset, &asset.APIs[0])
endpoints := endpointsMap{}
sd, err := sh.getServiceDetail(&asset, &asset.APIs[0], endpoints)

assert.Nil(t, sd)
assert.Equal(t, expectedErr, err)
Expand Down
6 changes: 5 additions & 1 deletion pkg/discovery/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package discovery

import "github.com/Axway/agents-mulesoft/pkg/anypoint"
import (
"github.com/Axway/agent-sdk/pkg/apic"
"github.com/Axway/agents-mulesoft/pkg/anypoint"
)

// ServiceDetail is the information for the ex
type ServiceDetail struct {
Expand All @@ -10,6 +13,7 @@ type ServiceDetail struct {
AuthPolicy string
Description string
Documentation []byte
Endpoints []apic.EndpointDefinition
ID string
Image string
ImageContentType string
Expand Down