From 5fd541426139354841566c5e206d7d70cd26a01a Mon Sep 17 00:00:00 2001 From: Vivek Singh Chauhan Date: Wed, 23 Oct 2024 00:01:49 +0100 Subject: [PATCH] APIGOV-29054 - Updates to query monitoring datasource - added config to optionally disable monitoring archive API to fetch metrics and use monitoring query to fetch API metrics --- build/mulesoft_traceability_agent.yml | 1 + pkg/anypoint/client.go | 75 +++++++++- pkg/anypoint/client_test.go | 105 +++++--------- pkg/anypoint/monitoring.go | 143 +++++++++++++++++++ pkg/anypoint/testdata/apis.json | 8 ++ pkg/anypoint/testdata/assets.json | 3 + pkg/anypoint/testdata/boot-data.json | 10 ++ pkg/anypoint/testdata/monitoring-archive.txt | 1 + pkg/anypoint/testdata/org-333-envs.json | 10 ++ pkg/anypoint/testdata/org-444-envs.json | 10 ++ pkg/anypoint/testdata/policies.json | 5 + pkg/anypoint/testdata/query-response.json | 30 ++++ pkg/anypoint/testdata/summary-datafiles.json | 7 + pkg/anypoint/testdata/user.json | 25 ++++ pkg/anypoint/types.go | 50 ------- pkg/config/config.go | 4 + pkg/traceability/agent.go | 2 +- pkg/traceability/agent_test.go | 11 +- pkg/traceability/muleemitter.go | 57 +++++--- pkg/traceability/muleemitter_test.go | 7 +- 20 files changed, 417 insertions(+), 147 deletions(-) create mode 100644 pkg/anypoint/monitoring.go create mode 100644 pkg/anypoint/testdata/apis.json create mode 100644 pkg/anypoint/testdata/assets.json create mode 100644 pkg/anypoint/testdata/boot-data.json create mode 100644 pkg/anypoint/testdata/monitoring-archive.txt create mode 100644 pkg/anypoint/testdata/org-333-envs.json create mode 100644 pkg/anypoint/testdata/org-444-envs.json create mode 100644 pkg/anypoint/testdata/policies.json create mode 100644 pkg/anypoint/testdata/query-response.json create mode 100644 pkg/anypoint/testdata/summary-datafiles.json create mode 100644 pkg/anypoint/testdata/user.json diff --git a/build/mulesoft_traceability_agent.yml b/build/mulesoft_traceability_agent.yml index f93ad90..eb5ac41 100644 --- a/build/mulesoft_traceability_agent.yml +++ b/build/mulesoft_traceability_agent.yml @@ -46,6 +46,7 @@ mulesoft_traceability_agent: orgName: "${MULESOFT_ORGNAME}" cachePath: "${MULESOFT_CACHEPATH:/tmp}" pollInterval: ${MULESOFT_POLLINTERVAL:20s} + useMonitoringAPI: "${MULESOFT_USEMONITORINGAPI}" auth: username: "${MULESOFT_AUTH_USERNAME}" password: "${MULESOFT_AUTH_PASSWORD}" diff --git a/pkg/anypoint/client.go b/pkg/anypoint/client.go index 571669b..eeabe9f 100644 --- a/pkg/anypoint/client.go +++ b/pkg/anypoint/client.go @@ -25,6 +25,11 @@ const ( HealthCheckEndpoint = "mulesoft" monitoringURITemplate = "%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d" metricSummaryURITemplate = "%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d/%s" + queryTemplate = `SELECT sum("request_size.count") as request_count, max("response_time.max") as response_max, min("response_time.min") as response_min +FROM "rp_general"."api_summary_metric" +WHERE ("api_id" = '%s' AND "api_version_id" = '%s') AND +time >= %dms and time <= %dms - 30m +GROUP BY "client_id", "status_code"` ) // Page describes the page query parameter @@ -56,6 +61,8 @@ type Client interface { } type AnalyticsClient interface { + GetMonitoringBootstrap() (*MonitoringBootInfo, error) + GetMonitoringMetrics(dataSourceName string, dataSourceID int, apiID, apiVersionID string, startDate, endTime time.Time) ([]APIMonitoringMetric, error) GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) OnConfigChange(mulesoftConfig *config.MulesoftConfig) GetClientApplication(appID string) (*Application, error) @@ -390,6 +397,72 @@ func (c *AnypointClient) GetExchangeFileContent(link, packaging, mainFile string return fileContent, wasConverted, err } +func (c *AnypointClient) GetMonitoringBootstrap() (*MonitoringBootInfo, error) { + headers := map[string]string{ + "Authorization": c.getAuthString(c.auth.GetToken()), + } + + url := fmt.Sprintf("%s/monitoring/api/visualizer/api/bootdata", c.baseURL) + bootInfo := &MonitoringBootInfo{} + request := coreapi.Request{ + Method: coreapi.GET, + URL: url, + Headers: headers, + } + + err := c.invokeJSON(request, &bootInfo) + if err != nil { + return nil, err + } + + return bootInfo, err +} + +// GetMonitoringMetrics returns monitoring data from InfluxDb +func (c *AnypointClient) GetMonitoringMetrics(dataSourceName string, dataSourceID int, apiID, apiVersionID string, startDate, endTime time.Time) ([]APIMonitoringMetric, error) { + headers := map[string]string{ + "Authorization": c.getAuthString(c.auth.GetToken()), + } + + query := fmt.Sprintf(queryTemplate, apiID, apiVersionID, 1729589392, endTime.UnixMilli()) + url := fmt.Sprintf("%s/monitoring/api/visualizer/api/datasources/proxy/%d/query", c.baseURL, dataSourceID) + request := coreapi.Request{ + Method: coreapi.GET, + URL: url, + Headers: headers, + QueryParams: map[string]string{ + "db": dataSourceName, + "q": query, + "epoch": "ms", + }, + } + metricResponse := &MetricResponse{} + err := c.invokeJSON(request, metricResponse) + if err != nil { + return nil, err + } + // convert metricResponse + metrics := make([]APIMonitoringMetric, 0) + for _, mr := range metricResponse.Results { + for _, ms := range mr.Series { + m := APIMonitoringMetric{ + Time: ms.Time, + Events: []APISummaryMetricEvent{ + { + ClientID: ms.Tags.ClientID, + StatusCode: ms.Tags.StatusCode, + RequestSizeCount: int(ms.Count), + ResponseSizeMax: int(ms.ResponseMax), + ResponseSizeMin: int(ms.ResponseMin), + }, + }, + } + metrics = append(metrics, m) + } + } + return metrics, err +} + // GetMonitoringArchive returns archived monitoring data Mulesoft: // https://anypoint.mulesoft.com/exchange/portals/anypoint-platform/f1e97bc6-315a-4490-82a7-23abe036327a.anypoint-platform/anypoint-monitoring-archive-api/minor/1.0/pages/home/ func (c *AnypointClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) { @@ -464,7 +537,7 @@ func (c *AnypointClient) parseMetricSummaries(metricDataStream []byte) ([]APIMon } break } - metricTime := time.Unix(0, metricData.Time) + metricTime := time.Unix(metricData.Time, 0) metric := APIMonitoringMetric{ Time: metricTime, Events: metricData.Events, diff --git a/pkg/anypoint/client_test.go b/pkg/anypoint/client_test.go index f82a923..95c6c32 100644 --- a/pkg/anypoint/client_test.go +++ b/pkg/anypoint/client_test.go @@ -1,6 +1,8 @@ package anypoint import ( + "io" + "os" "testing" "time" @@ -13,9 +15,13 @@ import ( "github.com/stretchr/testify/assert" ) -var metricData = ` -{"format":"v2","time":1585082947062,"type":"api_summary_metric","commons":{"deployment_type":"RTF","api_id":"204393","cluster_id":"rtf","env_id":"env","public_ip":"127.0.0.1","org_id":"org","worker_id":"worker-1"},"events":[{"response_size.max":2,"request_size.min":6,"status_code":"200","method":"POST","response_time.max":4,"api_version_id":"223337","response_size.count":1,"response_size.sum":2,"response_time.min":4,"request_size.count":1,"api_version":"v1:223337","request_size.sos":36,"client_id":"eb30101d7394407ea86f0643e1c63331","response_time.count":1,"response_time.sum":4,"request_size.max":6,"request_disposition":"processed","response_time.sos":16,"api_name":"groupId:6046b96d-c9aa-4cb2-9b30-90a54fc01a7b:assetId:policy_sla_rate_limit","response_size.min":2,"request_size.sum":6,"response_size.sos":4}],"metadata":{"batch_id":0,"aggregated":true,"limited":false,"producer_name":"analytics-metrics-collector-mule3","producer_version":"2.2.2-SNAPSHOT"}} -` +func readTestDataFile(t *testing.T, fileName string) []byte { + file, _ := os.Open(fileName) + inputData, err := io.ReadAll(file) + assert.Nil(t, err) + + return inputData +} func TestClient(t *testing.T) { cfg := &config.MulesoftConfig{ @@ -39,82 +45,27 @@ func TestClient(t *testing.T) { }, "/accounts/api/me": { Code: 200, - Body: []byte(`{ - "user": { - "identityType": "idtype", - "id": "123", - "username": "name", - "firstName": "first", - "lastName": "last", - "email": "email", - "organization": { - "id": "333", - "name": "org1", - "domain": "abc.com" - }, - "memberOfOrganizations": [{ - "id": "333", - "name": "org1" - }, - { - "id": "444", - "name": "BusinessOrg1" - } - ] - - } - }`), + Body: readTestDataFile(t, "./testdata/user.json"), }, "/accounts/api/organizations/444/environments": { Code: 200, - Body: []byte(`{ - "data": [{ - "id": "111", - "name": "Sandbox", - "organizationId": "444", - "type": "fake", - "clientId": "abc123" - }], - "total": 1 - }`), + Body: readTestDataFile(t, "./testdata/org-444-envs.json"), }, "/accounts/api/organizations/333/environments": { Code: 200, - Body: []byte(`{ - "data": [{ - "id": "111", - "name": "name", - "organizationId": "333", - "type": "fake", - "clientId": "abc123" - }], - "total": 1 - }`), + Body: readTestDataFile(t, "./testdata/org-333-envs.json"), }, "/apimanager/api/v1/organizations/444/environments/111/apis": { Code: 200, - Body: []byte(`{ - "assets": [ - { - "apis": [] - } - ], - "total": 1 - }`), + Body: readTestDataFile(t, "./testdata/apis.json"), }, "/apimanager/api/v1/organizations/444/environments/111/apis/10/policies": { Code: 200, - Body: []byte(`[ - { - "id": 0 - } - ]`), + Body: readTestDataFile(t, "./testdata/policies.json"), }, "/exchange/api/v2/assets/1/2/3": { Code: 200, - Body: []byte(`{ - "assetId": "petstore" - }`), + Body: readTestDataFile(t, "./testdata/assets.json"), }, "/icon": { Code: 200, @@ -130,17 +81,19 @@ func TestClient(t *testing.T) { }, "/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01": { Code: 200, - Body: []byte(`{ - "resources": [ - { - "id": "444-111-222.log" - } - ] - }`), + Body: readTestDataFile(t, "./testdata/summary-datafiles.json"), }, "/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01/444-111-222.log": { Code: 200, - Body: []byte(metricData), + Body: readTestDataFile(t, "./testdata/monitoring-archive.txt"), + }, + "/monitoring/api/visualizer/api/bootdata": { + Code: 200, + Body: readTestDataFile(t, "./testdata/boot-data.json"), + }, + "/monitoring/api/visualizer/api/datasources/proxy/1234/query": { + Code: 200, + Body: readTestDataFile(t, "./testdata/query-response.json"), }, } @@ -205,6 +158,14 @@ func TestClient(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 1, len(events)) + bootInfo, err := client.GetMonitoringBootstrap() + assert.Nil(t, err) + assert.NotNil(t, bootInfo) + + events, err = client.GetMonitoringMetrics(bootInfo.Settings.DataSource.InfluxDB.Database, bootInfo.Settings.DataSource.InfluxDB.ID, "222", "222", startTime, startTime) + assert.Nil(t, err) + assert.Equal(t, 1, len(events)) + go client.auth.Stop() done := <-ma.ch assert.True(t, done) diff --git a/pkg/anypoint/monitoring.go b/pkg/anypoint/monitoring.go new file mode 100644 index 0000000..1e75d0d --- /dev/null +++ b/pkg/anypoint/monitoring.go @@ -0,0 +1,143 @@ +package anypoint + +import ( + "encoding/json" + "time" +) + +// Monitoring Archive API metrics data definitions +type APIMonitoringMetric struct { + Time time.Time + Events []APISummaryMetricEvent +} + +type DataFile struct { + ID string `json:"id"` + Time time.Time `json:"time"` + Size int `json:"size"` +} + +type DataFileResources struct { + Resources []DataFile `json:"resources"` +} + +type APISummaryMetricEvent struct { + APIName string `json:"api_name"` + APIVersion string `json:"api_version"` + APIVersionID string `json:"api_version_id"` + ClientID string `json:"client_id"` + Method string `json:"method"` + StatusCode string `json:"status_code"` + ResponseSizeCount int `json:"response_size.count"` + ResponseSizeMax int `json:"response_size.max"` + ResponseSizeMin int `json:"response_size.min"` + ResponseSizeSos int `json:"response_size.sos"` + ResponseSizeSum int `json:"response_size.sum"` + ResponseTimeCount int `json:"response_time.count"` + ResponseTimeMax int `json:"response_time.max"` + ResponseTimeMin int `json:"response_time.min"` + ResponseTimeSos int `json:"response_time.sos"` + ResponseTimeSum int `json:"response_time.sum"` + RequestSizeCount int `json:"request_size.count"` + RequestSizeMax int `json:"request_size.max"` + RequestSizeMin int `json:"request_size.min"` + RequestSizeSos int `json:"request_size.sos"` + RequestSizeSum int `json:"request_size.sum"` + RequestDisposition string `json:"request_disposition"` +} + +type MetricData struct { + Format string `json:"format"` + Time int64 `json:"time"` + Type string `json:"type"` + Metadata map[string]interface{} `json:"metadata"` + Commons map[string]interface{} `json:"commons"` + Events []APISummaryMetricEvent +} + +// Influx DB based metric data definitions +type MonitoringBootInfo struct { + Settings MonitoringBootSetting `json:"Settings"` +} + +type MonitoringBootSetting struct { + DataSource MonitoringDataSource `json:"datasources"` +} + +type MonitoringDataSource struct { + InfluxDB InfluxDB `json:"influxdb"` +} + +type InfluxDB struct { + ID int `json:"id"` + Database string `json:"database"` +} + +type MetricResponse struct { + Results []*MetricResult `json:"results"` +} + +type MetricResult struct { + StatementID int `json:"statement_id"` + Series []*MetricSeries `json:"series"` +} + +type MetricTag struct { + ClientID string `json:"client_id"` + StatusCode string `json:"status_code"` +} + +type MetricSeries struct { + Name string `json:"name"` + Tags *MetricTag `json:"tags"` + Columns []string `json:"columns"` + Values [][]float64 `json:"values"` + Time time.Time `json:"-"` + Count int64 `json:"-"` + ResponseMax int64 `json:"-"` + ResponseMin int64 `json:"-"` +} + +func (ms *MetricSeries) UnmarshalJSON(data []byte) error { + type alias MetricSeries + v := &struct{ *alias }{ + alias: (*alias)(ms), + } + + err := json.Unmarshal(data, v) + if err != nil { + return err + } + + tm := ms.getValue("time") + ms.Time = time.Unix(tm, 0) + ms.Count = ms.getValue("request_count") + ms.ResponseMax = ms.getValue("response_max") + ms.ResponseMin = ms.getValue("response_min") + + return nil +} + +func (ms *MetricSeries) getValue(columnName string) int64 { + return ms.getMetricSeriesIndexValue(ms.getMetricSeriesColumnIndex(columnName)) +} + +func (ms *MetricSeries) getMetricSeriesIndexValue(index int) int64 { + if len(ms.Values) > 0 { + val := ms.Values[0] + if index >= 0 && index < len(val) { + return int64(val[index]) + } + } + + return 0 +} + +func (ms *MetricSeries) getMetricSeriesColumnIndex(column string) int { + for n := 0; n < len(ms.Columns); n++ { + if ms.Columns[n] == column { + return n + } + } + return -1 +} diff --git a/pkg/anypoint/testdata/apis.json b/pkg/anypoint/testdata/apis.json new file mode 100644 index 0000000..f431010 --- /dev/null +++ b/pkg/anypoint/testdata/apis.json @@ -0,0 +1,8 @@ +{ + "assets": [ + { + "apis": [] + } + ], + "total": 1 +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/assets.json b/pkg/anypoint/testdata/assets.json new file mode 100644 index 0000000..555fe8e --- /dev/null +++ b/pkg/anypoint/testdata/assets.json @@ -0,0 +1,3 @@ +{ + "assetId": "petstore" +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/boot-data.json b/pkg/anypoint/testdata/boot-data.json new file mode 100644 index 0000000..d882678 --- /dev/null +++ b/pkg/anypoint/testdata/boot-data.json @@ -0,0 +1,10 @@ +{ + "Settings": { + "datasources": { + "influxdb": { + "database": "\"db\"", + "id": 1234 + } + } + } +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/monitoring-archive.txt b/pkg/anypoint/testdata/monitoring-archive.txt new file mode 100644 index 0000000..0f5ab9f --- /dev/null +++ b/pkg/anypoint/testdata/monitoring-archive.txt @@ -0,0 +1 @@ +{"format":"v2","time":1585082947062,"type":"api_summary_metric","commons":{"deployment_type":"RTF","api_id":"204393","cluster_id":"rtf","env_id":"env","public_ip":"127.0.0.1","org_id":"org","worker_id":"worker-1"},"events":[{"response_size.max":2,"request_size.min":6,"status_code":"200","method":"POST","response_time.max":4,"api_version_id":"223337","response_size.count":1,"response_size.sum":2,"response_time.min":4,"request_size.count":1,"api_version":"v1:223337","request_size.sos":36,"client_id":"eb30101d7394407ea86f0643e1c63331","response_time.count":1,"response_time.sum":4,"request_size.max":6,"request_disposition":"processed","response_time.sos":16,"api_name":"groupId:6046b96d-c9aa-4cb2-9b30-90a54fc01a7b:assetId:policy_sla_rate_limit","response_size.min":2,"request_size.sum":6,"response_size.sos":4}],"metadata":{"batch_id":0,"aggregated":true,"limited":false,"producer_name":"analytics-metrics-collector-mule3","producer_version":"2.2.2-SNAPSHOT"}} \ No newline at end of file diff --git a/pkg/anypoint/testdata/org-333-envs.json b/pkg/anypoint/testdata/org-333-envs.json new file mode 100644 index 0000000..9adbc07 --- /dev/null +++ b/pkg/anypoint/testdata/org-333-envs.json @@ -0,0 +1,10 @@ +{ + "data": [{ + "id": "111", + "name": "name", + "organizationId": "333", + "type": "fake", + "clientId": "abc123" + }], + "total": 1 +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/org-444-envs.json b/pkg/anypoint/testdata/org-444-envs.json new file mode 100644 index 0000000..ee07467 --- /dev/null +++ b/pkg/anypoint/testdata/org-444-envs.json @@ -0,0 +1,10 @@ +{ + "data": [{ + "id": "111", + "name": "Sandbox", + "organizationId": "444", + "type": "fake", + "clientId": "abc123" + }], + "total": 1 +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/policies.json b/pkg/anypoint/testdata/policies.json new file mode 100644 index 0000000..db98f51 --- /dev/null +++ b/pkg/anypoint/testdata/policies.json @@ -0,0 +1,5 @@ +[ + { + "id": 0 + } +] \ No newline at end of file diff --git a/pkg/anypoint/testdata/query-response.json b/pkg/anypoint/testdata/query-response.json new file mode 100644 index 0000000..e90f220 --- /dev/null +++ b/pkg/anypoint/testdata/query-response.json @@ -0,0 +1,30 @@ +{ + "results": [ + { + "statement_id": 0, + "series": [ + { + "name": "api_summary_metric", + "tags": { + "client_id": "", + "status_code": "200" + }, + "columns": [ + "time", + "request_count", + "response_max", + "response_min" + ], + "values": [ + [ + 1729589392, + 4, + 3243, + 74 + ] + ] + } + ] + } + ] + } \ No newline at end of file diff --git a/pkg/anypoint/testdata/summary-datafiles.json b/pkg/anypoint/testdata/summary-datafiles.json new file mode 100644 index 0000000..8425fa8 --- /dev/null +++ b/pkg/anypoint/testdata/summary-datafiles.json @@ -0,0 +1,7 @@ +{ + "resources": [ + { + "id": "444-111-222.log" + } + ] +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/user.json b/pkg/anypoint/testdata/user.json new file mode 100644 index 0000000..c3b9675 --- /dev/null +++ b/pkg/anypoint/testdata/user.json @@ -0,0 +1,25 @@ +{ + "user": { + "identityType": "idtype", + "id": "123", + "username": "name", + "firstName": "first", + "lastName": "last", + "email": "email", + "organization": { + "id": "333", + "name": "org1", + "domain": "abc.com" + }, + "memberOfOrganizations": [{ + "id": "333", + "name": "org1" + }, + { + "id": "444", + "name": "BusinessOrg1" + } + ] + + } +} \ No newline at end of file diff --git a/pkg/anypoint/types.go b/pkg/anypoint/types.go index a553280..b0dc62e 100644 --- a/pkg/anypoint/types.go +++ b/pkg/anypoint/types.go @@ -184,56 +184,6 @@ type ExchangeFile struct { SHA1 string `json:"sha1"` } -// APIMonitoringMetric - -type APIMonitoringMetric struct { - Time time.Time - Events []APISummaryMetricEvent -} - -type DataFile struct { - ID string `json:"id"` - Time time.Time `json:"time"` - Size int `json:"size"` -} - -type DataFileResources struct { - Resources []DataFile `json:"resources"` -} - -type APISummaryMetricEvent struct { - APIName string `json:"api_name"` - APIVersion string `json:"api_version"` - APIVersionID string `json:"api_version_id"` - ClientID string `json:"client_id"` - Method string `json:"method"` - StatusCode string `json:"status_code"` - ResponseSizeCount int `json:"response_size.count"` - ResponseSizeMax int `json:"response_size.max"` - ResponseSizeMin int `json:"response_size.min"` - ResponseSizeSos int `json:"response_size.sos"` - ResponseSizeSum int `json:"response_size.sum"` - ResponseTimeCount int `json:"response_time.count"` - ResponseTimeMax int `json:"response_time.max"` - ResponseTimeMin int `json:"response_time.min"` - ResponseTimeSos int `json:"response_time.sos"` - ResponseTimeSum int `json:"response_time.sum"` - RequestSizeCount int `json:"request_size.count"` - RequestSizeMax int `json:"request_size.max"` - RequestSizeMin int `json:"request_size.min"` - RequestSizeSos int `json:"request_size.sos"` - RequestSizeSum int `json:"request_size.sum"` - RequestDisposition string `json:"request_disposition"` -} - -type MetricData struct { - Format string `json:"format"` - Time int64 `json:"time"` - Type string `json:"type"` - Metadata map[string]interface{} `json:"metadata"` - Commons map[string]interface{} `json:"commons"` - Events []APISummaryMetricEvent -} - type Application struct { APIEndpoints bool `json:"apiEndpoints,omitempty"` ClientID string `json:"clientId"` diff --git a/pkg/config/config.go b/pkg/config/config.go index 854ffd3..5f39933 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -43,6 +43,7 @@ const ( pathProxyURL = "mulesoft.proxyUrl" pathCachePath = "mulesoft.cachePath" pathDiscoverOriginalRaml = "mulesoft.discoverOriginalRaml" + pathUseMonitoringAPI = "mulesoft.useMonitoringAPI" ) const ( @@ -87,6 +88,7 @@ type MulesoftConfig struct { ClientID string `config:"auth.clientID"` ClientSecret string `config:"auth.clientSecret"` DiscoverOriginalRaml bool `config:"discoverOriginalRaml"` + UseMonitoringAPI bool `config:"useMonitoringAPI"` } // ValidateCfg - Validates the gateway config @@ -140,6 +142,7 @@ func AddConfigProperties(rootProps props) { rootProps.AddStringProperty(pathSSLMinVersion, corecfg.TLSDefaultMinVersionString(), "Minimum acceptable SSL/TLS protocol version.") rootProps.AddStringProperty(pathSSLMaxVersion, "0", "Maximum acceptable SSL/TLS protocol version.") rootProps.AddBoolProperty(pathDiscoverOriginalRaml, false, "If RAML API specs are discovered as RAML and not converted to OAS") + rootProps.AddBoolProperty(pathUseMonitoringAPI, true, "Flag to setup traceability agent to use Anypoint Monitoring Archive API") } // NewMulesoftConfig - parse the props and create an Mulesoft Configuration structure @@ -165,5 +168,6 @@ func NewMulesoftConfig(rootProps props) *MulesoftConfig { MaxVersion: corecfg.TLSVersionAsValue(rootProps.StringPropertyValue(pathSSLMaxVersion)), }, DiscoverOriginalRaml: rootProps.BoolPropertyValue(pathDiscoverOriginalRaml), + UseMonitoringAPI: rootProps.BoolPropertyValue(pathUseMonitoringAPI), } } diff --git a/pkg/traceability/agent.go b/pkg/traceability/agent.go index 20adebd..1fa8a0a 100644 --- a/pkg/traceability/agent.go +++ b/pkg/traceability/agent.go @@ -48,7 +48,7 @@ func NewBeater(_ *beat.Beat, _ *common.Config) (beat.Beater, error) { var err error client := anypoint.NewClient(agentConfig.MulesoftConfig) - emitter := NewMuleEventEmitter(agentConfig.MulesoftConfig.CachePath, eventChannel, client, agent.GetCacheManager()) + emitter := NewMuleEventEmitter(agentConfig.MulesoftConfig, eventChannel, client, agent.GetCacheManager()) emitterJob, err := NewMuleEventEmitterJob(emitter, pollInterval, traceabilityHealthCheck, hc.GetStatus, hc.RegisterHealthcheck) if err != nil { diff --git a/pkg/traceability/agent_test.go b/pkg/traceability/agent_test.go index 00a7a55..753d987 100644 --- a/pkg/traceability/agent_test.go +++ b/pkg/traceability/agent_test.go @@ -56,10 +56,11 @@ func TestAgent_Run(t *testing.T) { instanceCache := &mockInstaceCache{} svcInst := management.NewAPIServiceInstance("api", "env") util.SetAgentDetailsKey(svcInst, common.AttrAPIID, "1234") + util.SetAgentDetailsKey(svcInst, common.AttrAssetID, "1234") svcInst.Metadata.ID = "1234" ri, _ := svcInst.AsInstance() instanceCache.AddAPIServiceInstance(ri) - emitter := NewMuleEventEmitter("/tmp", eventChannel, client, instanceCache) + emitter := NewMuleEventEmitter(&config.MulesoftConfig{CachePath: "/tmp", UseMonitoringAPI: true}, eventChannel, client, instanceCache) collector := &mockMetricCollector{ channel: processorChannel, } @@ -99,6 +100,14 @@ type mockAnalyticsClient struct { err error } +func (m mockAnalyticsClient) GetMonitoringBootstrap() (*anypoint.MonitoringBootInfo, error) { + return nil, m.err +} + +func (m mockAnalyticsClient) GetMonitoringMetrics(dataSourceName string, dataSourceID int, apiID, apiVersionID string, startDate, endTime time.Time) ([]anypoint.APIMonitoringMetric, error) { + return m.events, m.err +} + func (m mockAnalyticsClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]anypoint.APIMonitoringMetric, error) { return m.events, m.err } diff --git a/pkg/traceability/muleemitter.go b/pkg/traceability/muleemitter.go index 982b46c..48b0a18 100644 --- a/pkg/traceability/muleemitter.go +++ b/pkg/traceability/muleemitter.go @@ -38,11 +38,12 @@ type healthChecker func(name, endpoint string, check hc.CheckStatus) (string, er // MuleEventEmitter - Gathers analytics data for publishing to Central. type MuleEventEmitter struct { - client anypoint.AnalyticsClient - eventChannel chan common.MetricEvent - cache cache.Cache - cachePath string - instanceCache instanceCache + client anypoint.AnalyticsClient + eventChannel chan common.MetricEvent + cache cache.Cache + cachePath string + instanceCache instanceCache + useMonitoringAPI bool } // MuleEventEmitterJob wraps an Emitter and implements the Job interface so that it can be executed by the sdk. @@ -55,36 +56,41 @@ type MuleEventEmitterJob struct { } // NewMuleEventEmitter - Creates a client to poll for events. -func NewMuleEventEmitter(cachePath string, eventChannel chan common.MetricEvent, client anypoint.AnalyticsClient, instanceCache instanceCache) *MuleEventEmitter { +func NewMuleEventEmitter(config *config.MulesoftConfig, eventChannel chan common.MetricEvent, client anypoint.AnalyticsClient, instanceCache instanceCache) *MuleEventEmitter { me := &MuleEventEmitter{ - eventChannel: eventChannel, - client: client, - instanceCache: instanceCache, + eventChannel: eventChannel, + client: client, + instanceCache: instanceCache, + useMonitoringAPI: config.UseMonitoringAPI, } - me.cachePath = formatCachePath(cachePath) + me.cachePath = formatCachePath(config.CachePath) me.cache = cache.Load(me.cachePath) return me } // Start retrieves analytics data from anypoint and sends them on the event channel for processing. func (me *MuleEventEmitter) Start() error { + var bootInfo *anypoint.MonitoringBootInfo + if me.useMonitoringAPI { + bi, err := me.client.GetMonitoringBootstrap() + if err != nil { + return err + } + bootInfo = bi + } + // change the cache to store startTime per API instanceKeys := me.instanceCache.GetAPIServiceInstanceKeys() + reportEndTime := time.Now() for _, instanceID := range instanceKeys { instance, _ := me.instanceCache.GetAPIServiceInstanceByID(instanceID) - apiID, _ := util.GetAgentDetailsValue(instance, common.AttrAPIID) + apiID, _ := util.GetAgentDetailsValue(instance, common.AttrAssetID) + apiVersionID, _ := util.GetAgentDetailsValue(instance, common.AttrAPIID) if apiID == "" { continue } - lastAPIReportTime := me.getLastRun(apiID) - metrics, err := me.client.GetMonitoringArchive(apiID, lastAPIReportTime) - - if err != nil { - logrus.WithError(err).Error("failed to get analytics data") - return err - } - + metrics, err := me.getMetrics(bootInfo, apiID, apiVersionID, lastAPIReportTime, reportEndTime) endTime := lastAPIReportTime for _, metric := range metrics { // Report only latest entries, ignore old entries @@ -109,11 +115,24 @@ func (me *MuleEventEmitter) Start() error { } } me.saveLastRun(apiID, endTime) + if err != nil { + logrus.WithError(err).Error("failed to get analytics data") + return err + } } return nil } + +func (me *MuleEventEmitter) getMetrics(bootInfo *anypoint.MonitoringBootInfo, apiID, apiVersionID string, startTime, endTime time.Time) ([]anypoint.APIMonitoringMetric, error) { + if me.useMonitoringAPI { + return me.client.GetMonitoringArchive(apiID, startTime) + } + + return me.client.GetMonitoringMetrics(bootInfo.Settings.DataSource.InfluxDB.Database, bootInfo.Settings.DataSource.InfluxDB.ID, apiID, apiVersionID, startTime, endTime) +} + func (me *MuleEventEmitter) getLastRun(apiID string) time.Time { tStamp, _ := me.cache.Get(CacheKeyTimeStamp + "-" + apiID) // use instance.Metadata.Audit.CreateTimestamp instead of Now() diff --git a/pkg/traceability/muleemitter_test.go b/pkg/traceability/muleemitter_test.go index b4f8767..e24922a 100644 --- a/pkg/traceability/muleemitter_test.go +++ b/pkg/traceability/muleemitter_test.go @@ -71,11 +71,12 @@ func Test_MuleEventEmitter(t *testing.T) { instanceCache := &mockInstaceCache{} svcInst := management.NewAPIServiceInstance("api", "env") util.SetAgentDetailsKey(svcInst, common.AttrAPIID, "1234") + util.SetAgentDetailsKey(svcInst, common.AttrAssetID, "1234") svcInst.Metadata.ID = "1234" ri, _ := svcInst.AsInstance() instanceCache.AddAPIServiceInstance(ri) - emitter := NewMuleEventEmitter("/tmp", eventCh, client, instanceCache) + emitter := NewMuleEventEmitter(&config.MulesoftConfig{CachePath: "/tmp", UseMonitoringAPI: true}, eventCh, client, instanceCache) assert.NotNil(t, emitter) @@ -89,7 +90,7 @@ func Test_MuleEventEmitter(t *testing.T) { events: []anypoint.APIMonitoringMetric{}, err: fmt.Errorf("failed"), } - emitter = NewMuleEventEmitter("/tmp", eventCh, client, instanceCache) + emitter = NewMuleEventEmitter(&config.MulesoftConfig{CachePath: "/tmp", UseMonitoringAPI: true}, eventCh, client, instanceCache) err := emitter.Start() assert.Equal(t, client.err, err) } @@ -108,7 +109,7 @@ func TestMuleEventEmitterJob(t *testing.T) { events: []anypoint.APIMonitoringMetric{}, err: nil, } - emitter := NewMuleEventEmitter("/tmp", eventCh, client, &mockInstaceCache{}) + emitter := NewMuleEventEmitter(&config.MulesoftConfig{CachePath: "/tmp", UseMonitoringAPI: true}, eventCh, client, &mockInstaceCache{}) job, err := NewMuleEventEmitterJob(emitter, pollInterval, mockHealthCheck, getStatusSuccess, mockRegisterHC) assert.Nil(t, err)