diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index cdb88ca..b9a7cec 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -19,8 +19,6 @@ import ( "github.com/hetznercloud/hcloud-go/v2/hcloud" ) -var logger = log.DefaultLogger - const ( QueryTypeResourceList = "resource-list" QueryTypeMetrics = "metrics" @@ -53,8 +51,16 @@ type Options struct { type QueryModel struct { ResourceType ResourceType `json:"resourceType"` MetricsType MetricsType `json:"metricsType"` + ResourceIDs []int64 `json:"resourceIds"` } +const ( + // DefaultBufferPeriod is the default buffer period for the QueryRunner. + DefaultBufferPeriod = 200 * time.Millisecond +) + +var logger = log.DefaultLogger + // Make sure Datasource implements required interfaces. This is important to do // since otherwise we will only get a not implemented error response from plugin in // runtime. In this example datasource instance implements backend.QueryDataHandler, @@ -102,6 +108,9 @@ func NewDatasource(_ context.Context, settings backend.DataSourceInstanceSetting client: client, } + d.queryRunnerServer = NewQueryRunner[hcloud.ServerMetrics](DefaultBufferPeriod, d.serverAPIRequestFn, filterServerMetrics) + d.queryRunnerLoadBalancer = NewQueryRunner[hcloud.LoadBalancerMetrics](DefaultBufferPeriod, d.loadBalancerAPIRequestFn, filterLoadBalancerMetrics) + return d, nil } @@ -109,6 +118,9 @@ func NewDatasource(_ context.Context, settings backend.DataSourceInstanceSetting // its health and has streaming skills. type Datasource struct { client *hcloud.Client + + queryRunnerServer *QueryRunner[hcloud.ServerMetrics] + queryRunnerLoadBalancer *QueryRunner[hcloud.LoadBalancerMetrics] } // Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance @@ -231,7 +243,7 @@ func (d *Datasource) queryResourceList(ctx context.Context, query backend.DataQu } func (d *Datasource) queryMetrics(ctx context.Context, query backend.DataQuery) backend.DataResponse { - var response backend.DataResponse + var resp backend.DataResponse var qm QueryModel err := json.Unmarshal(query.JSON, &qm) @@ -239,27 +251,34 @@ func (d *Datasource) queryMetrics(ctx context.Context, query backend.DataQuery) return backend.ErrDataResponseWithSource(backend.StatusBadRequest, backend.ErrorSourcePlugin, fmt.Sprintf("json unmarshal: %v", err.Error())) } - step := stepSize(query) + if len(qm.ResourceIDs) == 0 { + // TODO: Return metrics for all matching resources + return backend.ErrDataResponseWithSource(backend.StatusBadRequest, backend.ErrorSourcePlugin, "no resource IDs provided") + } + if qm.ResourceType != ResourceTypeServer { + return backend.ErrDataResponseWithSource(backend.StatusBadRequest, backend.ErrorSourcePlugin, fmt.Sprintf("unsupported resouce type: %v", qm.ResourceType)) + } - metrics, _, err := d.client.Server.GetMetrics(ctx, &hcloud.Server{ID: 40502748}, hcloud.ServerGetMetricsOpts{ - Types: []hcloud.ServerMetricType{metricTypeToServerMetricType(qm.MetricsType)}, - Start: query.TimeRange.From, - End: query.TimeRange.To, - Step: step, - }) + step := stepSize(query.TimeRange, query.Interval, query.MaxDataPoints) - // add the frames to the response. - response.Frames = append(response.Frames, serverMetricsToFrames(metrics, query.RefID)...) + metrics, _ := d.queryRunnerServer.RequestMetrics(ctx, qm.ResourceIDs, RequestOpts{ + MetricsTypes: []MetricsType{qm.MetricsType}, + TimeRange: query.TimeRange, + Step: step, + }) + for _, serverMetrics := range metrics { + resp.Frames = append(resp.Frames, serverMetricsToFrames(serverMetrics)...) + } - return response + return resp } -func stepSize(query backend.DataQuery) int { - step := int(math.Floor(query.Interval.Seconds())) +func stepSize(timeRange backend.TimeRange, interval time.Duration, maxDataPoints int64) int { + step := int(math.Floor(interval.Seconds())) - if int64(step) > query.MaxDataPoints { + if int64(step) > maxDataPoints { // If the query results in more data points than Grafana allows, we need to request a larger step size. - maxInterval := query.TimeRange.Duration().Seconds() / float64(query.MaxDataPoints) + maxInterval := timeRange.Duration().Seconds() / float64(maxDataPoints) step = int(math.Floor(maxInterval)) } @@ -270,13 +289,12 @@ func stepSize(query backend.DataQuery) int { return step } -func serverMetricsToFrames(metrics *hcloud.ServerMetrics, refID string) []*data.Frame { +func serverMetricsToFrames(metrics *hcloud.ServerMetrics) []*data.Frame { frames := make([]*data.Frame, 0, len(metrics.TimeSeries)) // get all keys in map metrics.TimeSeries for name, series := range metrics.TimeSeries { frame := data.NewFrame(name) - frame.RefID = refID timestamps := make([]time.Time, 0, len(series)) values := make([]float64, 0, len(series)) @@ -479,3 +497,78 @@ func (d *Datasource) getLoadBalancers(ctx context.Context) ([]SelectableValue, e return selectableValues, nil } + +func (d *Datasource) serverAPIRequestFn(ctx context.Context, id int64, opts RequestOpts) (*hcloud.ServerMetrics, error) { + hcloudGoMetricsTypes := make([]hcloud.ServerMetricType, 0, len(opts.MetricsTypes)) + for _, metricsType := range opts.MetricsTypes { + hcloudGoMetricsTypes = append(hcloudGoMetricsTypes, metricTypeToServerMetricType(metricsType)) + } + + metrics, _, err := d.client.Server.GetMetrics(ctx, &hcloud.Server{ID: id}, hcloud.ServerGetMetricsOpts{ + Types: hcloudGoMetricsTypes, + Start: opts.TimeRange.From, + End: opts.TimeRange.To, + Step: opts.Step, + }) + + return metrics, err +} + +func (d *Datasource) loadBalancerAPIRequestFn(ctx context.Context, id int64, opts RequestOpts) (*hcloud.LoadBalancerMetrics, error) { + hcloudGoMetricsTypes := make([]hcloud.LoadBalancerMetricType, 0, len(opts.MetricsTypes)) + for _, metricsType := range opts.MetricsTypes { + hcloudGoMetricsTypes = append(hcloudGoMetricsTypes, metricTypeToLoadBalancerMetricType(metricsType)) + } + + metrics, _, err := d.client.LoadBalancer.GetMetrics(ctx, &hcloud.LoadBalancer{ID: id}, hcloud.LoadBalancerGetMetricsOpts{ + Types: hcloudGoMetricsTypes, + Start: opts.TimeRange.From, + End: opts.TimeRange.To, + Step: opts.Step, + }) + + return metrics, err +} + +var ( + serverMetricsTypeSeries = map[MetricsType][]string{ + MetricsTypeServerCPU: {"cpu"}, + MetricsTypeServerDisk: {"disk.0.iops.read", "disk.0.iops.write", "disk.0.bandwidth.read", "disk.0.bandwidth.write"}, + MetricsTypeServerNetwork: {"network.0.pps.in", "network.0.pps.out", "network.0.bandwidth.in", "network.0.bandwidth.out"}, + } + + loadBalancerMetricsTypeSeries = map[MetricsType][]string{ + MetricsTypeLoadBalancerOpenConnections: {"open_connections"}, + MetricsTypeLoadBalancerConnectionsPerSecond: {"connections_per_second"}, + MetricsTypeLoadBalancerRequestsPerSecond: {"requests_per_second"}, + MetricsTypeLoadBalancerBandwidth: {"bandwidth.in", "bandwidth.out"}, + } +) + +func filterServerMetrics(metrics *hcloud.ServerMetrics, metricsTypes []MetricsType) *hcloud.ServerMetrics { + metricsCopy := *metrics + metricsCopy.TimeSeries = make(map[string][]hcloud.ServerMetricsValue) + + // For every requested metricsType, copy every series into the copied struct + for _, metricsType := range metricsTypes { + for _, series := range serverMetricsTypeSeries[metricsType] { + metricsCopy.TimeSeries[series] = metrics.TimeSeries[series] + } + } + + return &metricsCopy +} + +func filterLoadBalancerMetrics(metrics *hcloud.LoadBalancerMetrics, metricsTypes []MetricsType) *hcloud.LoadBalancerMetrics { + metricsCopy := *metrics + metricsCopy.TimeSeries = make(map[string][]hcloud.LoadBalancerMetricsValue) + + // For every requested metricsType, copy every series into the copied struct + for _, metricsType := range metricsTypes { + for _, series := range loadBalancerMetricsTypeSeries[metricsType] { + metricsCopy.TimeSeries[series] = metrics.TimeSeries[series] + } + } + + return &metricsCopy +} diff --git a/pkg/plugin/query_runner.go b/pkg/plugin/query_runner.go new file mode 100644 index 0000000..31f0103 --- /dev/null +++ b/pkg/plugin/query_runner.go @@ -0,0 +1,271 @@ +package plugin + +import ( + "context" + "github.com/apricote/grafana-hcloud-datasource/pkg/set" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/hetznercloud/hcloud-go/v2/hcloud" + "slices" + "sync" + "time" +) + +type HCloudMetrics interface { + hcloud.ServerMetrics | hcloud.LoadBalancerMetrics +} + +type RequestOpts struct { + MetricsTypes []MetricsType + TimeRange backend.TimeRange + Step int +} + +type APIRequestFn[M HCloudMetrics] func(ctx context.Context, id int64, opts RequestOpts) (*M, error) +type FilterMetricsFn[M HCloudMetrics] func(metrics *M, metricsTypes []MetricsType) *M + +// QueryRunner is responsible for getting the Metrics from the Hetzner Cloud API. +// +// The Hetzner Cloud API has endpoints that expose all metrics for a single resource (server/load-balancer). This runs +// counter to the way you would use the metrics in Grafana, where you would like to see a single metrics for +// multiple resources. +// +// The naive solution to this would send one request per resource per incoming query to the API, but this can easily +// exhaust the API rate limit. The QueryRunner instead buffers incoming requests and only sends a single request to the +// API per resource requested during the buffer period. If you show metrics from the same resource in ie. 5 panels, this +// will only send 1 request to the API instead of 5. +// +// The downside is that responses are slower, because we always wait for the buffer period to end before sending the +// requests. +// +// Internally the QueryRunner +type QueryRunner[M HCloudMetrics] struct { + mutex sync.Mutex + + bufferPeriod time.Duration + bufferTimer *time.Timer + + apiRequestFn APIRequestFn[M] + filterMetricsFn FilterMetricsFn[M] + + requests map[int64][]request[M] +} + +func NewQueryRunner[M HCloudMetrics](bufferPeriod time.Duration, apiRequestFn APIRequestFn[M], filterMetrics FilterMetricsFn[M]) *QueryRunner[M] { + q := &QueryRunner[M]{ + bufferPeriod: bufferPeriod, + apiRequestFn: apiRequestFn, + filterMetricsFn: filterMetrics, + requests: make(map[int64][]request[M]), + } + + return q +} + +type request[M HCloudMetrics] struct { + opts RequestOpts + responseCh chan<- response[M] +} + +type response[M HCloudMetrics] struct { + metrics *M + err error +} + +func (q *QueryRunner[M]) RequestMetrics(ctx context.Context, ids []int64, opts RequestOpts) ([]*M, error) { + responseCh := make(chan response[M], len(ids)) + req := request[M]{ + opts: opts, + responseCh: responseCh, + } + + q.mutex.Lock() + for _, id := range ids { + q.requests[id] = append(q.requests[id], req) + } + q.startBuffer() + q.mutex.Unlock() + + results := make([]*M, 0, len(ids)) + + for len(results) < len(ids) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case resp := <-responseCh: + if resp.err != nil { + // TODO: return partial results? cancel outgoing requests? + return nil, resp.err + } + + results = append(results, resp.metrics) + } + } + + return results, nil +} + +// startBuffer starts the buffer timer if it's not already running. Caller must hold the mutex. +func (q *QueryRunner[M]) startBuffer() { + if q.bufferTimer == nil { + q.bufferTimer = time.AfterFunc(q.bufferPeriod, q.sendRequests) + } +} + +func (q *QueryRunner[M]) sendRequests() { + ctx := context.Background() + + q.mutex.Lock() + // TODO: only lock mutex for deduplicating requests and sending the responses, we can still accept more requests + // while talking to the api + defer q.resetBufferTimer() + + // Actual length might be larger, but its a good starting point + allRequests := make([]struct { + id int64 + opts RequestOpts + }, 0, len(q.requests)) + + for id, requests := range q.requests { + id := id + allOpts := make([]RequestOpts, 0, len(requests)) + for _, req := range requests { + allOpts = append(allOpts, req.opts) + } + + uniqueOpts := uniqueRequests(allOpts) + + for _, opts := range uniqueOpts { + allRequests = append(allRequests, struct { + id int64 + opts RequestOpts + }{id: id, opts: opts}) + } + } + + // We are finished reading from q for now, lets unlock the mutex until we need it again + q.mutex.Unlock() + + type responseFoo struct { + id int64 + opts RequestOpts + + resp response[M] + } + + responses := make(chan responseFoo) + wg := sync.WaitGroup{} + wg.Add(len(allRequests)) + go func() { + wg.Wait() + close(responses) + }() + + for _, req := range allRequests { + req := req + go func() { + defer wg.Done() + metrics, err := q.apiRequestFn(ctx, req.id, req.opts) + responses <- responseFoo{ + id: req.id, + opts: req.opts, + resp: response[M]{metrics: metrics, err: err}, + } + }() + } + + // Lock the mutex again to get a consistent view of q.requests and to be able to delete/respond there + q.mutex.Lock() + defer q.mutex.Unlock() + + // Iterate over all received responses + for resp := range responses { + // Send the response to all open requests that match it + // Remove all requests that have received a response from q.requests + newRequestsForID := make([]request[M], 0, len(q.requests[resp.id])-1) + for _, req := range q.requests[resp.id] { + if resp.opts.matches(req.opts) { + if resp.resp.err != nil { + req.responseCh <- response[M]{ + err: resp.resp.err, + } + } else { + req.responseCh <- response[M]{ + metrics: q.filterMetricsFn(resp.resp.metrics, req.opts.MetricsTypes), + } + } + } else { + newRequestsForID = append(newRequestsForID, req) + } + } + + if len(newRequestsForID) == 0 { + delete(q.requests, resp.id) + } else { + q.requests[resp.id] = newRequestsForID + } + } + +} + +func (q *QueryRunner[M]) resetBufferTimer() { + // Reset buffer timer + q.bufferTimer = nil + + // TODO: only required once we release mutex while making api requests + if len(q.requests) > 0 { + q.startBuffer() + } +} + +// uniqueRequests deduplicates requests by combining requests with the same time range and step. All metrics types are added together +func uniqueRequests(requests []RequestOpts) []RequestOpts { + type key struct { + timeRange backend.TimeRange + step int + } + + unique := make(map[key]set.Set[MetricsType]) + + for _, req := range requests { + k := key{ + timeRange: req.TimeRange, + step: req.Step, + } + + if _, ok := unique[k]; !ok { + unique[k] = set.New[MetricsType]() + } + + unique[k].Insert(req.MetricsTypes...) + } + + uniqueSlice := make([]RequestOpts, 0, len(unique)) + for k, v := range unique { + metricsTypes := v.ToSlice() + slices.Sort(metricsTypes) // Make testing possible + + uniqueSlice = append(uniqueSlice, RequestOpts{ + MetricsTypes: metricsTypes, + TimeRange: k.timeRange, + Step: k.step, + }) + } + + return uniqueSlice +} + +// matches returns true if a response to r can fully satisfy other. +func (r RequestOpts) matches(other RequestOpts) bool { + timeRangeMatches := r.TimeRange.From == other.TimeRange.From && r.TimeRange.To == other.TimeRange.To + stepMatches := r.Step == other.Step + + typesMatch := true + for _, metricsType := range other.MetricsTypes { + if !slices.Contains(r.MetricsTypes, metricsType) { + typesMatch = false + break + } + } + + return timeRangeMatches && stepMatches && typesMatch +} diff --git a/pkg/plugin/query_runner_test.go b/pkg/plugin/query_runner_test.go new file mode 100644 index 0000000..9c9a44b --- /dev/null +++ b/pkg/plugin/query_runner_test.go @@ -0,0 +1,118 @@ +package plugin + +import ( + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/hetznercloud/hcloud-go/v2/hcloud" + "reflect" + "testing" + "time" +) + +func Test_uniqueRequests(t *testing.T) { + var ( + date2020 = time.Date(2020, 0, 0, 0, 0, 0, 0, time.UTC) + date2021 = time.Date(2021, 0, 0, 0, 0, 0, 0, time.UTC) + date2022 = time.Date(2022, 0, 0, 0, 0, 0, 0, time.UTC) + date2023 = time.Date(2023, 0, 0, 0, 0, 0, 0, time.UTC) + ) + + type testCase[M HCloudMetrics] struct { + name string + requests []RequestOpts + want []RequestOpts + } + // Only testing for ServerMetrics because the actual implementation is irrelevant for this method + tests := []testCase[hcloud.ServerMetrics]{ + { + name: "single", + requests: []RequestOpts{ + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + }, want: []RequestOpts{ + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + }, + }, + { + name: "same type, same range", + requests: []RequestOpts{ + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + }, want: []RequestOpts{ + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + }, + }, + { + name: "different type, same range", + requests: []RequestOpts{ + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + { + MetricsTypes: []MetricsType{MetricsTypeServerNetwork}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + }, want: []RequestOpts{ + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU, MetricsTypeServerNetwork}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + }, + }, + { + name: "same type, different range", + requests: []RequestOpts{ + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2022, To: date2023}, + Step: 1, + }, + }, want: []RequestOpts{ + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2020, To: date2021}, + Step: 1, + }, + { + MetricsTypes: []MetricsType{MetricsTypeServerCPU}, + TimeRange: backend.TimeRange{From: date2022, To: date2023}, + Step: 1, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := uniqueRequests(tt.requests); !reflect.DeepEqual(got, tt.want) { + t.Errorf("uniqueRequests() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/set/set.go b/pkg/set/set.go new file mode 100644 index 0000000..ad3940f --- /dev/null +++ b/pkg/set/set.go @@ -0,0 +1,34 @@ +package set + +type Set[T comparable] map[T]struct{} + +func New[T comparable]() Set[T] { + return make(Set[T]) +} + +func (s Set[T]) Insert(elements ...T) { + for _, element := range elements { + s[element] = struct{}{} + } +} + +func (s Set[T]) Has(element T) bool { + _, ok := s[element] + return ok +} + +func (s Set[T]) ToSlice() []T { + elements := make([]T, 0, len(s)) + + for element := range s { + elements = append(elements, element) + } + + return elements +} + +func From[T comparable](element ...T) Set[T] { + set := New[T]() + set.Insert(element...) + return set +} diff --git a/src/components/QueryEditor.tsx b/src/components/QueryEditor.tsx index a0f2e0d..1294f14 100644 --- a/src/components/QueryEditor.tsx +++ b/src/components/QueryEditor.tsx @@ -1,4 +1,4 @@ -import React, { useState } from 'react'; +import React, { useCallback, useState } from 'react'; import { AsyncMultiSelect, InlineField, InlineFieldRow, Select } from '@grafana/ui'; import { QueryEditorProps, SelectableValue } from '@grafana/data'; import { DataSource } from '../datasource'; @@ -40,7 +40,28 @@ export function QueryEditor({ query, onChange, onRunQuery, datasource }: Props) onRunQuery(); }; - const [formResourceIDs, setFormResourceIDs] = useState>>([]); + const { queryType, resourceType, metricsType, resourceIDs } = query; + + const multiselectLoadResources = useCallback( + async (_: string) => { + switch (resourceType) { + case 'server': { + return datasource.getServers(); + } + case 'load-balancer': { + return datasource.getLoadBalancers(); + } + } + }, + [datasource, resourceType] + ); + + // Foobar + // TODO Properly restore the selected resources after the options are loaded, + // currently we always show empty form even if the query has IDs set + const [formResourceIDs, setFormResourceIDs] = useState>>( + resourceIDs.map((id) => ({ value: id })) + ); const onResourceNameOrIDsChange = (newValues: Array>) => { onChange({ ...query, resourceIDs: newValues.map((value) => value.value!) }); onRunQuery(); @@ -48,7 +69,6 @@ export function QueryEditor({ query, onChange, onRunQuery, datasource }: Props) }; const availableMetricTypes = query.resourceType === 'server' ? ServerMetricsTypes : LoadBalancerMetricsTypes; - const { queryType, resourceType, metricsType } = query; return ( @@ -84,10 +104,11 @@ export function QueryEditor({ query, onChange, onRunQuery, datasource }: Props) @@ -95,14 +116,3 @@ export function QueryEditor({ query, onChange, onRunQuery, datasource }: Props) ); } - -const loadResources = (datasource: DataSource, resourceType: Query['resourceType']) => async (_: string) => { - switch (resourceType) { - case 'server': { - return datasource.getServers(); - } - case 'load-balancer': { - return datasource.getLoadBalancers(); - } - } -}; diff --git a/src/types.ts b/src/types.ts index 92b6de7..8ef0b4b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -21,6 +21,7 @@ export const DEFAULT_QUERY: Partial = { queryType: 'metrics', resourceType: 'server', metricsType: 'cpu', + resourceIDs: [], }; export const DEFAULT_VARIABLE_QUERY: Partial = {