Skip to content

Commit

Permalink
APIGOV-29054 - Updates to query monitoring datasource
Browse files Browse the repository at this point in the history
- added config to optionally disable monitoring archive API to fetch metrics and use monitoring query to fetch API metrics
  • Loading branch information
vivekschauhan committed Oct 22, 2024
1 parent 1f25ba7 commit 5fd5414
Show file tree
Hide file tree
Showing 20 changed files with 417 additions and 147 deletions.
1 change: 1 addition & 0 deletions build/mulesoft_traceability_agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mulesoft_traceability_agent:
orgName: "${MULESOFT_ORGNAME}"
cachePath: "${MULESOFT_CACHEPATH:/tmp}"
pollInterval: ${MULESOFT_POLLINTERVAL:20s}
useMonitoringAPI: "${MULESOFT_USEMONITORINGAPI}"
auth:
username: "${MULESOFT_AUTH_USERNAME}"
password: "${MULESOFT_AUTH_PASSWORD}"
Expand Down
75 changes: 74 additions & 1 deletion pkg/anypoint/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
HealthCheckEndpoint = "mulesoft"
monitoringURITemplate = "%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d"
metricSummaryURITemplate = "%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d/%s"
queryTemplate = `SELECT sum("request_size.count") as request_count, max("response_time.max") as response_max, min("response_time.min") as response_min
FROM "rp_general"."api_summary_metric"
WHERE ("api_id" = '%s' AND "api_version_id" = '%s') AND
time >= %dms and time <= %dms - 30m
GROUP BY "client_id", "status_code"`
)

// Page describes the page query parameter
Expand Down Expand Up @@ -56,6 +61,8 @@ type Client interface {
}

type AnalyticsClient interface {
GetMonitoringBootstrap() (*MonitoringBootInfo, error)
GetMonitoringMetrics(dataSourceName string, dataSourceID int, apiID, apiVersionID string, startDate, endTime time.Time) ([]APIMonitoringMetric, error)
GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error)
OnConfigChange(mulesoftConfig *config.MulesoftConfig)
GetClientApplication(appID string) (*Application, error)
Expand Down Expand Up @@ -390,6 +397,72 @@ func (c *AnypointClient) GetExchangeFileContent(link, packaging, mainFile string
return fileContent, wasConverted, err
}

func (c *AnypointClient) GetMonitoringBootstrap() (*MonitoringBootInfo, error) {
headers := map[string]string{
"Authorization": c.getAuthString(c.auth.GetToken()),
}

url := fmt.Sprintf("%s/monitoring/api/visualizer/api/bootdata", c.baseURL)
bootInfo := &MonitoringBootInfo{}
request := coreapi.Request{
Method: coreapi.GET,
URL: url,
Headers: headers,
}

err := c.invokeJSON(request, &bootInfo)
if err != nil {
return nil, err
}

return bootInfo, err
}

// GetMonitoringMetrics returns monitoring data from InfluxDb
func (c *AnypointClient) GetMonitoringMetrics(dataSourceName string, dataSourceID int, apiID, apiVersionID string, startDate, endTime time.Time) ([]APIMonitoringMetric, error) {
headers := map[string]string{
"Authorization": c.getAuthString(c.auth.GetToken()),
}

query := fmt.Sprintf(queryTemplate, apiID, apiVersionID, 1729589392, endTime.UnixMilli())
url := fmt.Sprintf("%s/monitoring/api/visualizer/api/datasources/proxy/%d/query", c.baseURL, dataSourceID)
request := coreapi.Request{
Method: coreapi.GET,
URL: url,
Headers: headers,
QueryParams: map[string]string{
"db": dataSourceName,
"q": query,
"epoch": "ms",
},
}
metricResponse := &MetricResponse{}
err := c.invokeJSON(request, metricResponse)
if err != nil {
return nil, err
}
// convert metricResponse
metrics := make([]APIMonitoringMetric, 0)
for _, mr := range metricResponse.Results {
for _, ms := range mr.Series {
m := APIMonitoringMetric{
Time: ms.Time,
Events: []APISummaryMetricEvent{
{
ClientID: ms.Tags.ClientID,
StatusCode: ms.Tags.StatusCode,
RequestSizeCount: int(ms.Count),
ResponseSizeMax: int(ms.ResponseMax),
ResponseSizeMin: int(ms.ResponseMin),
},
},
}
metrics = append(metrics, m)
}
}
return metrics, err
}

// GetMonitoringArchive returns archived monitoring data Mulesoft:
// https://anypoint.mulesoft.com/exchange/portals/anypoint-platform/f1e97bc6-315a-4490-82a7-23abe036327a.anypoint-platform/anypoint-monitoring-archive-api/minor/1.0/pages/home/
func (c *AnypointClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) {
Expand Down Expand Up @@ -464,7 +537,7 @@ func (c *AnypointClient) parseMetricSummaries(metricDataStream []byte) ([]APIMon
}
break
}
metricTime := time.Unix(0, metricData.Time)
metricTime := time.Unix(metricData.Time, 0)
metric := APIMonitoringMetric{
Time: metricTime,
Events: metricData.Events,
Expand Down
105 changes: 33 additions & 72 deletions pkg/anypoint/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package anypoint

import (
"io"
"os"
"testing"
"time"

Expand All @@ -13,9 +15,13 @@ import (
"github.com/stretchr/testify/assert"
)

var metricData = `
{"format":"v2","time":1585082947062,"type":"api_summary_metric","commons":{"deployment_type":"RTF","api_id":"204393","cluster_id":"rtf","env_id":"env","public_ip":"127.0.0.1","org_id":"org","worker_id":"worker-1"},"events":[{"response_size.max":2,"request_size.min":6,"status_code":"200","method":"POST","response_time.max":4,"api_version_id":"223337","response_size.count":1,"response_size.sum":2,"response_time.min":4,"request_size.count":1,"api_version":"v1:223337","request_size.sos":36,"client_id":"eb30101d7394407ea86f0643e1c63331","response_time.count":1,"response_time.sum":4,"request_size.max":6,"request_disposition":"processed","response_time.sos":16,"api_name":"groupId:6046b96d-c9aa-4cb2-9b30-90a54fc01a7b:assetId:policy_sla_rate_limit","response_size.min":2,"request_size.sum":6,"response_size.sos":4}],"metadata":{"batch_id":0,"aggregated":true,"limited":false,"producer_name":"analytics-metrics-collector-mule3","producer_version":"2.2.2-SNAPSHOT"}}
`
func readTestDataFile(t *testing.T, fileName string) []byte {
file, _ := os.Open(fileName)
inputData, err := io.ReadAll(file)
assert.Nil(t, err)

return inputData
}

func TestClient(t *testing.T) {
cfg := &config.MulesoftConfig{
Expand All @@ -39,82 +45,27 @@ func TestClient(t *testing.T) {
},
"/accounts/api/me": {
Code: 200,
Body: []byte(`{
"user": {
"identityType": "idtype",
"id": "123",
"username": "name",
"firstName": "first",
"lastName": "last",
"email": "email",
"organization": {
"id": "333",
"name": "org1",
"domain": "abc.com"
},
"memberOfOrganizations": [{
"id": "333",
"name": "org1"
},
{
"id": "444",
"name": "BusinessOrg1"
}
]
}
}`),
Body: readTestDataFile(t, "./testdata/user.json"),
},
"/accounts/api/organizations/444/environments": {
Code: 200,
Body: []byte(`{
"data": [{
"id": "111",
"name": "Sandbox",
"organizationId": "444",
"type": "fake",
"clientId": "abc123"
}],
"total": 1
}`),
Body: readTestDataFile(t, "./testdata/org-444-envs.json"),
},
"/accounts/api/organizations/333/environments": {
Code: 200,
Body: []byte(`{
"data": [{
"id": "111",
"name": "name",
"organizationId": "333",
"type": "fake",
"clientId": "abc123"
}],
"total": 1
}`),
Body: readTestDataFile(t, "./testdata/org-333-envs.json"),
},
"/apimanager/api/v1/organizations/444/environments/111/apis": {
Code: 200,
Body: []byte(`{
"assets": [
{
"apis": []
}
],
"total": 1
}`),
Body: readTestDataFile(t, "./testdata/apis.json"),
},
"/apimanager/api/v1/organizations/444/environments/111/apis/10/policies": {
Code: 200,
Body: []byte(`[
{
"id": 0
}
]`),
Body: readTestDataFile(t, "./testdata/policies.json"),
},
"/exchange/api/v2/assets/1/2/3": {
Code: 200,
Body: []byte(`{
"assetId": "petstore"
}`),
Body: readTestDataFile(t, "./testdata/assets.json"),
},
"/icon": {
Code: 200,
Expand All @@ -130,17 +81,19 @@ func TestClient(t *testing.T) {
},
"/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01": {
Code: 200,
Body: []byte(`{
"resources": [
{
"id": "444-111-222.log"
}
]
}`),
Body: readTestDataFile(t, "./testdata/summary-datafiles.json"),
},
"/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01/444-111-222.log": {
Code: 200,
Body: []byte(metricData),
Body: readTestDataFile(t, "./testdata/monitoring-archive.txt"),
},
"/monitoring/api/visualizer/api/bootdata": {
Code: 200,
Body: readTestDataFile(t, "./testdata/boot-data.json"),
},
"/monitoring/api/visualizer/api/datasources/proxy/1234/query": {
Code: 200,
Body: readTestDataFile(t, "./testdata/query-response.json"),
},
}

Expand Down Expand Up @@ -205,6 +158,14 @@ func TestClient(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 1, len(events))

bootInfo, err := client.GetMonitoringBootstrap()
assert.Nil(t, err)
assert.NotNil(t, bootInfo)

events, err = client.GetMonitoringMetrics(bootInfo.Settings.DataSource.InfluxDB.Database, bootInfo.Settings.DataSource.InfluxDB.ID, "222", "222", startTime, startTime)
assert.Nil(t, err)
assert.Equal(t, 1, len(events))

go client.auth.Stop()
done := <-ma.ch
assert.True(t, done)
Expand Down
Loading

0 comments on commit 5fd5414

Please sign in to comment.