Skip to content

Commit

Permalink
feat: deduplicate all requests going to API
Browse files Browse the repository at this point in the history
  • Loading branch information
apricote committed Dec 20, 2023
1 parent b7134f8 commit 17b38b9
Show file tree
Hide file tree
Showing 6 changed files with 561 additions and 34 deletions.
131 changes: 112 additions & 19 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/hetznercloud/hcloud-go/v2/hcloud"
)

var logger = log.DefaultLogger

const (
QueryTypeResourceList = "resource-list"
QueryTypeMetrics = "metrics"
Expand Down Expand Up @@ -53,8 +51,16 @@ type Options struct {
type QueryModel struct {
ResourceType ResourceType `json:"resourceType"`
MetricsType MetricsType `json:"metricsType"`
ResourceIDs []int64 `json:"resourceIds"`
}

const (
// DefaultBufferPeriod is the default buffer period for the QueryRunner.
DefaultBufferPeriod = 200 * time.Millisecond
)

var logger = log.DefaultLogger

// Make sure Datasource implements required interfaces. This is important to do
// since otherwise we will only get a not implemented error response from plugin in
// runtime. In this example datasource instance implements backend.QueryDataHandler,
Expand Down Expand Up @@ -102,13 +108,19 @@ func NewDatasource(_ context.Context, settings backend.DataSourceInstanceSetting
client: client,
}

d.queryRunnerServer = NewQueryRunner[hcloud.ServerMetrics](DefaultBufferPeriod, d.serverAPIRequestFn, filterServerMetrics)
d.queryRunnerLoadBalancer = NewQueryRunner[hcloud.LoadBalancerMetrics](DefaultBufferPeriod, d.loadBalancerAPIRequestFn, filterLoadBalancerMetrics)

return d, nil
}

// Datasource is an example datasource which can respond to data queries, reports
// its health and has streaming skills.
type Datasource struct {
client *hcloud.Client

queryRunnerServer *QueryRunner[hcloud.ServerMetrics]
queryRunnerLoadBalancer *QueryRunner[hcloud.LoadBalancerMetrics]
}

// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
Expand Down Expand Up @@ -231,35 +243,42 @@ func (d *Datasource) queryResourceList(ctx context.Context, query backend.DataQu
}

func (d *Datasource) queryMetrics(ctx context.Context, query backend.DataQuery) backend.DataResponse {
var response backend.DataResponse
var resp backend.DataResponse

var qm QueryModel
err := json.Unmarshal(query.JSON, &qm)
if err != nil {
return backend.ErrDataResponseWithSource(backend.StatusBadRequest, backend.ErrorSourcePlugin, fmt.Sprintf("json unmarshal: %v", err.Error()))
}

step := stepSize(query)
if len(qm.ResourceIDs) == 0 {
// TODO: Return metrics for all matching resources
return backend.ErrDataResponseWithSource(backend.StatusBadRequest, backend.ErrorSourcePlugin, "no resource IDs provided")
}
if qm.ResourceType != ResourceTypeServer {
return backend.ErrDataResponseWithSource(backend.StatusBadRequest, backend.ErrorSourcePlugin, fmt.Sprintf("unsupported resouce type: %v", qm.ResourceType))
}

metrics, _, err := d.client.Server.GetMetrics(ctx, &hcloud.Server{ID: 40502748}, hcloud.ServerGetMetricsOpts{
Types: []hcloud.ServerMetricType{metricTypeToServerMetricType(qm.MetricsType)},
Start: query.TimeRange.From,
End: query.TimeRange.To,
Step: step,
})
step := stepSize(query.TimeRange, query.Interval, query.MaxDataPoints)

// add the frames to the response.
response.Frames = append(response.Frames, serverMetricsToFrames(metrics, query.RefID)...)
metrics, _ := d.queryRunnerServer.RequestMetrics(ctx, qm.ResourceIDs, RequestOpts{
MetricsTypes: []MetricsType{qm.MetricsType},
TimeRange: query.TimeRange,
Step: step,
})
for _, serverMetrics := range metrics {
resp.Frames = append(resp.Frames, serverMetricsToFrames(serverMetrics)...)
}

return response
return resp
}

func stepSize(query backend.DataQuery) int {
step := int(math.Floor(query.Interval.Seconds()))
func stepSize(timeRange backend.TimeRange, interval time.Duration, maxDataPoints int64) int {
step := int(math.Floor(interval.Seconds()))

if int64(step) > query.MaxDataPoints {
if int64(step) > maxDataPoints {
// If the query results in more data points than Grafana allows, we need to request a larger step size.
maxInterval := query.TimeRange.Duration().Seconds() / float64(query.MaxDataPoints)
maxInterval := timeRange.Duration().Seconds() / float64(maxDataPoints)
step = int(math.Floor(maxInterval))
}

Expand All @@ -270,13 +289,12 @@ func stepSize(query backend.DataQuery) int {
return step
}

func serverMetricsToFrames(metrics *hcloud.ServerMetrics, refID string) []*data.Frame {
func serverMetricsToFrames(metrics *hcloud.ServerMetrics) []*data.Frame {
frames := make([]*data.Frame, 0, len(metrics.TimeSeries))

// get all keys in map metrics.TimeSeries
for name, series := range metrics.TimeSeries {
frame := data.NewFrame(name)
frame.RefID = refID

timestamps := make([]time.Time, 0, len(series))
values := make([]float64, 0, len(series))
Expand Down Expand Up @@ -479,3 +497,78 @@ func (d *Datasource) getLoadBalancers(ctx context.Context) ([]SelectableValue, e

return selectableValues, nil
}

func (d *Datasource) serverAPIRequestFn(ctx context.Context, id int64, opts RequestOpts) (*hcloud.ServerMetrics, error) {
hcloudGoMetricsTypes := make([]hcloud.ServerMetricType, 0, len(opts.MetricsTypes))
for _, metricsType := range opts.MetricsTypes {
hcloudGoMetricsTypes = append(hcloudGoMetricsTypes, metricTypeToServerMetricType(metricsType))
}

metrics, _, err := d.client.Server.GetMetrics(ctx, &hcloud.Server{ID: id}, hcloud.ServerGetMetricsOpts{
Types: hcloudGoMetricsTypes,
Start: opts.TimeRange.From,
End: opts.TimeRange.To,
Step: opts.Step,
})

return metrics, err
}

func (d *Datasource) loadBalancerAPIRequestFn(ctx context.Context, id int64, opts RequestOpts) (*hcloud.LoadBalancerMetrics, error) {
hcloudGoMetricsTypes := make([]hcloud.LoadBalancerMetricType, 0, len(opts.MetricsTypes))
for _, metricsType := range opts.MetricsTypes {
hcloudGoMetricsTypes = append(hcloudGoMetricsTypes, metricTypeToLoadBalancerMetricType(metricsType))
}

metrics, _, err := d.client.LoadBalancer.GetMetrics(ctx, &hcloud.LoadBalancer{ID: id}, hcloud.LoadBalancerGetMetricsOpts{
Types: hcloudGoMetricsTypes,
Start: opts.TimeRange.From,
End: opts.TimeRange.To,
Step: opts.Step,
})

return metrics, err
}

var (
serverMetricsTypeSeries = map[MetricsType][]string{
MetricsTypeServerCPU: {"cpu"},
MetricsTypeServerDisk: {"disk.0.iops.read", "disk.0.iops.write", "disk.0.bandwidth.read", "disk.0.bandwidth.write"},
MetricsTypeServerNetwork: {"network.0.pps.in", "network.0.pps.out", "network.0.bandwidth.in", "network.0.bandwidth.out"},
}

loadBalancerMetricsTypeSeries = map[MetricsType][]string{
MetricsTypeLoadBalancerOpenConnections: {"open_connections"},
MetricsTypeLoadBalancerConnectionsPerSecond: {"connections_per_second"},
MetricsTypeLoadBalancerRequestsPerSecond: {"requests_per_second"},
MetricsTypeLoadBalancerBandwidth: {"bandwidth.in", "bandwidth.out"},
}
)

func filterServerMetrics(metrics *hcloud.ServerMetrics, metricsTypes []MetricsType) *hcloud.ServerMetrics {
metricsCopy := *metrics
metricsCopy.TimeSeries = make(map[string][]hcloud.ServerMetricsValue)

// For every requested metricsType, copy every series into the copied struct
for _, metricsType := range metricsTypes {
for _, series := range serverMetricsTypeSeries[metricsType] {
metricsCopy.TimeSeries[series] = metrics.TimeSeries[series]
}
}

return &metricsCopy
}

func filterLoadBalancerMetrics(metrics *hcloud.LoadBalancerMetrics, metricsTypes []MetricsType) *hcloud.LoadBalancerMetrics {
metricsCopy := *metrics
metricsCopy.TimeSeries = make(map[string][]hcloud.LoadBalancerMetricsValue)

// For every requested metricsType, copy every series into the copied struct
for _, metricsType := range metricsTypes {
for _, series := range loadBalancerMetricsTypeSeries[metricsType] {
metricsCopy.TimeSeries[series] = metrics.TimeSeries[series]
}
}

return &metricsCopy
}
Loading

0 comments on commit 17b38b9

Please sign in to comment.