Skip to content

Commit

Permalink
feat: show server id in panels
Browse files Browse the repository at this point in the history
  • Loading branch information
apricote committed Dec 21, 2023
1 parent 73c1e17 commit 897106e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 110 deletions.
149 changes: 64 additions & 85 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ func (d *Datasource) queryMetrics(ctx context.Context, query backend.DataQuery)
TimeRange: query.TimeRange,
Step: step,
})
for _, serverMetrics := range metrics {
resp.Frames = append(resp.Frames, serverMetricsToFrames(serverMetrics)...)
for id, serverMetrics := range metrics {
resp.Frames = append(resp.Frames, serverMetricsToFrames(id, serverMetrics)...)
}

return resp
Expand All @@ -289,12 +289,12 @@ func stepSize(timeRange backend.TimeRange, interval time.Duration, maxDataPoints
return step
}

func serverMetricsToFrames(metrics *hcloud.ServerMetrics) []*data.Frame {
func serverMetricsToFrames(id int64, metrics *hcloud.ServerMetrics) []*data.Frame {
frames := make([]*data.Frame, 0, len(metrics.TimeSeries))

// get all keys in map metrics.TimeSeries
for name, series := range metrics.TimeSeries {
frame := data.NewFrame(name)
frame := data.NewFrame("")

timestamps := make([]time.Time, 0, len(series))
values := make([]float64, 0, len(series))
Expand All @@ -310,57 +310,9 @@ func serverMetricsToFrames(metrics *hcloud.ServerMetrics) []*data.Frame {
values = append(values, parsedValue)
}

valuesField := data.NewField("values", nil, values)

switch name {
case "cpu":
valuesField.Config = &data.FieldConfig{
DisplayName: "CPU Usage",
Unit: "percent",
}
case "disk.0.iops.read":
valuesField.Config = &data.FieldConfig{
DisplayName: "Disk IOPS Read",
Unit: "iops",
}
case "disk.0.iops.write":
valuesField.Config = &data.FieldConfig{
DisplayName: "Disk IOPS Write",
Unit: "iops",
}
case "disk.0.bandwidth.read":
valuesField.Config = &data.FieldConfig{
DisplayName: "Disk Bandwidth Read",
Unit: "bytes/sec(IEC)",
}
case "disk.0.bandwidth.write":
valuesField.Config = &data.FieldConfig{
DisplayName: "Disk Bandwidth Write",
Unit: "bytes/sec(IEC)",
}
case "network.0.pps.in":
valuesField.Config = &data.FieldConfig{
DisplayName: "Network PPS Received",
Unit: "packets/sec",
}
case "network.0.pps.out":
valuesField.Config = &data.FieldConfig{
DisplayName: "Network PPS Sent",
Unit: "packets/sec",
}
case "network.0.bandwidth.in":
valuesField.Config = &data.FieldConfig{
DisplayName: "Network Bandwidth Received",
Unit: "bytes/sec(IEC)",
}
case "network.0.bandwidth.out":
valuesField.Config = &data.FieldConfig{
DisplayName: "Network Bandwidth Sent",
Unit: "bytes/sec(IEC)",
}
default:
// Unknown series, not a problem, we just do not have
// a good display name and unit
valuesField := data.NewField(serverSeriesToDisplayName[name], data.Labels{"id": strconv.FormatInt(id, 10)}, values)
valuesField.Config = &data.FieldConfig{
Unit: serverSeriesToUnit[name],
}

frame.Fields = append(frame.Fields,
Expand Down Expand Up @@ -397,34 +349,6 @@ func (d *Datasource) CheckHealth(ctx context.Context, _ *backend.CheckHealthRequ
}, nil
}

func metricTypeToServerMetricType(metricsType MetricsType) hcloud.ServerMetricType {
switch metricsType {
case MetricsTypeServerCPU:
return hcloud.ServerMetricCPU
case MetricsTypeServerDisk:
return hcloud.ServerMetricDisk
case MetricsTypeServerNetwork:
return hcloud.ServerMetricNetwork
default:
return hcloud.ServerMetricCPU
}
}

func metricTypeToLoadBalancerMetricType(metricsType MetricsType) hcloud.LoadBalancerMetricType {
switch metricsType {
case MetricsTypeLoadBalancerOpenConnections:
return hcloud.LoadBalancerMetricOpenConnections
case MetricsTypeLoadBalancerConnectionsPerSecond:
return hcloud.LoadBalancerMetricConnectionsPerSecond
case MetricsTypeLoadBalancerRequestsPerSecond:
return hcloud.LoadBalancerMetricRequestsPerSecond
case MetricsTypeLoadBalancerBandwidth:
return hcloud.LoadBalancerMetricBandwidth
default:
return hcloud.LoadBalancerMetricOpenConnections
}
}

func (d *Datasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctxLogger := logger.FromContext(ctx)

Expand Down Expand Up @@ -501,7 +425,7 @@ func (d *Datasource) getLoadBalancers(ctx context.Context) ([]SelectableValue, e
func (d *Datasource) serverAPIRequestFn(ctx context.Context, id int64, opts RequestOpts) (*hcloud.ServerMetrics, error) {
hcloudGoMetricsTypes := make([]hcloud.ServerMetricType, 0, len(opts.MetricsTypes))
for _, metricsType := range opts.MetricsTypes {
hcloudGoMetricsTypes = append(hcloudGoMetricsTypes, metricTypeToServerMetricType(metricsType))
hcloudGoMetricsTypes = append(hcloudGoMetricsTypes, metricTypeToServerMetricType[metricsType])
}

metrics, _, err := d.client.Server.GetMetrics(ctx, &hcloud.Server{ID: id}, hcloud.ServerGetMetricsOpts{
Expand All @@ -517,7 +441,7 @@ func (d *Datasource) serverAPIRequestFn(ctx context.Context, id int64, opts Requ
func (d *Datasource) loadBalancerAPIRequestFn(ctx context.Context, id int64, opts RequestOpts) (*hcloud.LoadBalancerMetrics, error) {
hcloudGoMetricsTypes := make([]hcloud.LoadBalancerMetricType, 0, len(opts.MetricsTypes))
for _, metricsType := range opts.MetricsTypes {
hcloudGoMetricsTypes = append(hcloudGoMetricsTypes, metricTypeToLoadBalancerMetricType(metricsType))
hcloudGoMetricsTypes = append(hcloudGoMetricsTypes, metricTypeToLoadBalancerMetricType[metricsType])
}

metrics, _, err := d.client.LoadBalancer.GetMetrics(ctx, &hcloud.LoadBalancer{ID: id}, hcloud.LoadBalancerGetMetricsOpts{
Expand All @@ -537,12 +461,67 @@ var (
MetricsTypeServerNetwork: {"network.0.pps.in", "network.0.pps.out", "network.0.bandwidth.in", "network.0.bandwidth.out"},
}

serverSeriesToDisplayName = map[string]string{
// cpu
"cpu": "Usage",

// disk
"disk.0.iops.read": "IOPS Read",
"disk.0.iops.write": "IOPS Write",
"disk.0.bandwidth.read": "Bandwidth Read",
"disk.0.bandwidth.write": "Bandwidth Write",

//network
"network.0.pps.in": "PPS Received",
"network.0.pps.out": "PPS Sent",
"network.0.bandwidth.in": "Bandwidth Received",
"network.0.bandwidth.out": "Bandwidth Sent",
}

serverSeriesToUnit = map[string]string{
// cpu
"cpu": "percent",

// disk
"disk.0.iops.read": "iops",
"disk.0.iops.write": "iops",
"disk.0.bandwidth.read": "bytes/sec(IEC)",
"disk.0.bandwidth.write": "bytes/sec(IEC)",

//network
"network.0.pps.in": "packets/sec",
"network.0.pps.out": "packets/sec",
"network.0.bandwidth.in": "bytes/sec(IEC)",
"network.0.bandwidth.out": "bytes/sec(IEC)",
}

metricTypeToServerMetricType = map[MetricsType]hcloud.ServerMetricType{
MetricsTypeServerCPU: hcloud.ServerMetricCPU,
MetricsTypeServerDisk: hcloud.ServerMetricDisk,
MetricsTypeServerNetwork: hcloud.ServerMetricNetwork,
}

loadBalancerMetricsTypeSeries = map[MetricsType][]string{
MetricsTypeLoadBalancerOpenConnections: {"open_connections"},
MetricsTypeLoadBalancerConnectionsPerSecond: {"connections_per_second"},
MetricsTypeLoadBalancerRequestsPerSecond: {"requests_per_second"},
MetricsTypeLoadBalancerBandwidth: {"bandwidth.in", "bandwidth.out"},
}

loadBalancerSeriesToDisplayName = map[string]string{
// TODO
}

loadBalancerSeriesToUnit = map[string]string{
// TODO
}

metricTypeToLoadBalancerMetricType = map[MetricsType]hcloud.LoadBalancerMetricType{
MetricsTypeLoadBalancerOpenConnections: hcloud.LoadBalancerMetricOpenConnections,
MetricsTypeLoadBalancerConnectionsPerSecond: hcloud.LoadBalancerMetricConnectionsPerSecond,
MetricsTypeLoadBalancerRequestsPerSecond: hcloud.LoadBalancerMetricRequestsPerSecond,
MetricsTypeLoadBalancerBandwidth: hcloud.LoadBalancerMetricBandwidth,
}
)

func filterServerMetrics(metrics *hcloud.ServerMetrics, metricsTypes []MetricsType) *hcloud.ServerMetrics {
Expand Down
52 changes: 27 additions & 25 deletions pkg/plugin/query_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ type request[M HCloudMetrics] struct {
}

type response[M HCloudMetrics] struct {
id int64
opts RequestOpts

metrics *M
err error
}

func (q *QueryRunner[M]) RequestMetrics(ctx context.Context, ids []int64, opts RequestOpts) ([]*M, error) {
// RequestMetrics requests metrics matching the arguments given.
// It will return a slice of metrics for each id in the same order
func (q *QueryRunner[M]) RequestMetrics(ctx context.Context, ids []int64, opts RequestOpts) (map[int64]*M, error) {
responseCh := make(chan response[M], len(ids))
req := request[M]{
opts: opts,
Expand All @@ -85,7 +90,7 @@ func (q *QueryRunner[M]) RequestMetrics(ctx context.Context, ids []int64, opts R
q.startBuffer()
q.mutex.Unlock()

results := make([]*M, 0, len(ids))
results := make(map[int64]*M, len(ids))

for len(results) < len(ids) {
select {
Expand All @@ -97,7 +102,7 @@ func (q *QueryRunner[M]) RequestMetrics(ctx context.Context, ids []int64, opts R
return nil, resp.err
}

results = append(results, resp.metrics)
results[resp.id] = resp.metrics
}
}

Expand All @@ -111,12 +116,16 @@ func (q *QueryRunner[M]) startBuffer() {
}
}

// sendRequests sends the minimal amount of requests necessary to satisfy all
// requests that are in q.requests at the start of the method. It then sends
// responses to all requests that match the response, even if the request was
// only added to q.requests while the API requests were in flight. After that,
// it removes all requests that have been answered from q.requests and resets
// the buffer timer.
func (q *QueryRunner[M]) sendRequests() {
ctx := context.Background()

q.mutex.Lock()
// TODO: only lock mutex for deduplicating requests and sending the responses, we can still accept more requests
// while talking to the api
defer q.resetBufferTimer()

// Actual length might be larger, but its a good starting point
Expand Down Expand Up @@ -145,14 +154,7 @@ func (q *QueryRunner[M]) sendRequests() {
// We are finished reading from q for now, lets unlock the mutex until we need it again
q.mutex.Unlock()

type responseFoo struct {
id int64
opts RequestOpts

resp response[M]
}

responses := make(chan responseFoo)
responses := make(chan response[M])
wg := sync.WaitGroup{}
wg.Add(len(allRequests))
go func() {
Expand All @@ -165,10 +167,12 @@ func (q *QueryRunner[M]) sendRequests() {
go func() {
defer wg.Done()
metrics, err := q.apiRequestFn(ctx, req.id, req.opts)
responses <- responseFoo{
responses <- response[M]{
id: req.id,
opts: req.opts,
resp: response[M]{metrics: metrics, err: err},

metrics: metrics,
err: err,
}
}()
}
Expand All @@ -184,14 +188,12 @@ func (q *QueryRunner[M]) sendRequests() {
newRequestsForID := make([]request[M], 0, len(q.requests[resp.id])-1)
for _, req := range q.requests[resp.id] {
if resp.opts.matches(req.opts) {
if resp.resp.err != nil {
req.responseCh <- response[M]{
err: resp.resp.err,
}
} else {
req.responseCh <- response[M]{
metrics: q.filterMetricsFn(resp.resp.metrics, req.opts.MetricsTypes),
}
req.responseCh <- response[M]{
id: resp.id,
opts: req.opts,

metrics: q.filterMetricsFn(resp.metrics, req.opts.MetricsTypes),
err: resp.err,
}
} else {
newRequestsForID = append(newRequestsForID, req)
Expand All @@ -207,11 +209,11 @@ func (q *QueryRunner[M]) sendRequests() {

}

// resetBufferTimer will reset the buffer timer so new requests can be sent.
// It will also trigger a new buffer period if unanswered requests remain in the [q.requests]
func (q *QueryRunner[M]) resetBufferTimer() {
// Reset buffer timer
q.bufferTimer = nil

// TODO: only required once we release mutex while making api requests
if len(q.requests) > 0 {
q.startBuffer()
}
Expand Down

0 comments on commit 897106e

Please sign in to comment.