Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent,billing: Move clients and queue into new pkg/reporting #1078

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 25 additions & 98 deletions pkg/agent/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package billing
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"time"

"go.uber.org/zap"
Expand All @@ -15,7 +13,7 @@ import (
vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/billing"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/reporting"
)

type Config struct {
Expand All @@ -26,33 +24,6 @@ type Config struct {
AccumulateEverySeconds uint `json:"accumulateEverySeconds"`
}

type ClientsConfig struct {
AzureBlob *AzureBlobStorageConfig `json:"azureBlob"`
HTTP *HTTPClientConfig `json:"http"`
S3 *S3ClientConfig `json:"s3"`
}

type AzureBlobStorageConfig struct {
BaseClientConfig
billing.AzureBlobStorageClientConfig
}

type HTTPClientConfig struct {
BaseClientConfig
URL string `json:"url"`
}

type S3ClientConfig struct {
BaseClientConfig
billing.S3ClientConfig
}

type BaseClientConfig struct {
PushEverySeconds uint `json:"pushEverySeconds"`
PushRequestTimeoutSeconds uint `json:"pushRequestTimeoutSeconds"`
MaxBatchSize uint `json:"maxBatchSize"`
}

type metricsState struct {
historical map[metricsKey]vmMetricsHistory
present map[metricsKey]vmMetricsInstant
Expand Down Expand Up @@ -94,60 +65,38 @@ type vmMetricsSeconds struct {

type MetricsCollector struct {
conf *Config
clients []clientInfo
sink *reporting.EventSink[*billing.IncrementalEvent]
metrics PromMetrics
}

func NewMetricsCollector(
ctx context.Context,
parentLogger *zap.Logger,
conf *Config,
metrics PromMetrics,
) (*MetricsCollector, error) {
logger := parentLogger.Named("billing")
mc := &MetricsCollector{
conf: conf,
clients: make([]clientInfo, 0),
}

if c := conf.Clients.AzureBlob; c != nil {
client, err := billing.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig)
if err != nil {
return nil, fmt.Errorf("error creating AzureBlobStorageClient: %w", err)
}
mc.clients = append(mc.clients, clientInfo{
client: client,
name: "azureblob",
config: c.BaseClientConfig,
})
}
if c := conf.Clients.HTTP; c != nil {
mc.clients = append(mc.clients, clientInfo{
client: billing.NewHTTPClient(c.URL, http.DefaultClient),
name: "http",
config: c.BaseClientConfig,
})
}
if c := conf.Clients.S3; c != nil {
client, err := billing.NewS3Client(ctx, c.S3ClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}
logger.Info("Created S3 client", client.LogFields())
mc.clients = append(mc.clients, clientInfo{
client: client,
name: "s3",
config: c.BaseClientConfig,
})
clients, err := createClients(ctx, logger, conf.Clients)
if err != nil {
return nil, err
}

return mc, nil
sink := reporting.NewEventSink(logger, metrics.reporting, clients...)

return &MetricsCollector{
conf: conf,
sink: sink,
metrics: metrics,
}, nil
}

func (mc *MetricsCollector) Run(
ctx context.Context,
logger *zap.Logger,
store VMStoreForNode,
metrics PromMetrics,
) error {
logger = logger.Named("collect")

collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds))
defer collectTicker.Stop()
Expand All @@ -163,29 +112,7 @@ func (mc *MetricsCollector) Run(
pushWindowStart: time.Now(),
}

var queueWriters []eventQueuePusher[*billing.IncrementalEvent]

for _, c := range mc.clients {
qw, queueReader := newEventQueue[*billing.IncrementalEvent](metrics.queueSizeCurrent.WithLabelValues(c.name))
queueWriters = append(queueWriters, qw)

// Start the sender
signalDone, thisThreadFinished := util.NewCondChannelPair()
defer signalDone.Send() //nolint:gocritic // this defer-in-loop is intentional.
sender := eventSender{
clientInfo: c,
metrics: metrics,
queue: queueReader,
collectorFinished: thisThreadFinished,
lastSendDuration: 0,
}
go sender.senderLoop(logger.Named(fmt.Sprintf("send-%s", c.name)))
}

// The rest of this function is to do with collection
logger = logger.Named("collect")

state.collect(logger, store, metrics)
state.collect(logger, store, mc.metrics)

for {
select {
Expand All @@ -196,10 +123,10 @@ func (mc *MetricsCollector) Run(
logger.Panic("Validation check failed", zap.Error(err))
return err
}
state.collect(logger, store, metrics)
state.collect(logger, store, mc.metrics)
case <-accumulateTicker.C:
logger.Info("Creating billing batch")
state.drainEnqueue(logger, mc.conf, billing.GetHostname(), queueWriters)
state.drainEnqueue(logger, mc.conf, billing.GetHostname(), mc.sink)
case <-ctx.Done():
return nil
}
Expand Down Expand Up @@ -332,18 +259,18 @@ func logAddedEvent(logger *zap.Logger, event *billing.IncrementalEvent) *billing
}

// drainEnqueue clears the current history, adding it as events to the queue
func (s *metricsState) drainEnqueue(logger *zap.Logger, conf *Config, hostname string, queues []eventQueuePusher[*billing.IncrementalEvent]) {
func (s *metricsState) drainEnqueue(
logger *zap.Logger,
conf *Config,
hostname string,
sink *reporting.EventSink[*billing.IncrementalEvent],
) {
now := time.Now()

countInBatch := 0
batchSize := 2 * len(s.historical)

// Helper function that adds an event to all queues
enqueue := func(event *billing.IncrementalEvent) {
for _, q := range queues {
q.enqueue(event)
}
}
enqueue := sink.Enqueue

for key, history := range s.historical {
history.finalizeCurrentTimeSlice()
Expand Down
125 changes: 125 additions & 0 deletions pkg/agent/billing/clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package billing

// Management of billing clients

import (
"context"
"fmt"
"net/http"
"time"

"github.com/lithammer/shortuuid"
"go.uber.org/zap"

"github.com/neondatabase/autoscaling/pkg/billing"
"github.com/neondatabase/autoscaling/pkg/reporting"
)

type ClientsConfig struct {
AzureBlob *AzureBlobStorageClientConfig `json:"azureBlob"`
HTTP *HTTPClientConfig `json:"http"`
S3 *S3ClientConfig `json:"s3"`
}

type S3ClientConfig struct {
reporting.BaseClientConfig
reporting.S3ClientConfig
PrefixInBucket string `json:"prefixInBucket"`
}

type AzureBlobStorageClientConfig struct {
reporting.BaseClientConfig
reporting.AzureBlobStorageClientConfig
PrefixInContainer string `json:"prefixInContainer"`
}

type HTTPClientConfig struct {
reporting.BaseClientConfig
URL string `json:"url"`
}

type billingClient = reporting.Client[*billing.IncrementalEvent]

func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) ([]billingClient, error) {
var clients []billingClient

if c := cfg.HTTP; c != nil {
client := reporting.NewHTTPClient(http.DefaultClient, reporting.HTTPClientConfig{
URL: fmt.Sprintf("%s/usage_events", c.URL),
Method: http.MethodPost,
})
logger.Info("Created HTTP client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "http",
Base: client,
BaseConfig: c.BaseClientConfig,
GenerateTraceID: generateTraceID,
SerializeBatch: jsonMarshalEvents, // note: NOT gzipped.
})

}
if c := cfg.AzureBlob; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInContainer)
client, err := reporting.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig, generateKey)
if err != nil {
return nil, fmt.Errorf("error creating Azure Blob Storage client: %w", err)
}
logger.Info("Created Azure Blob Storage client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "azureblob",
Base: client,
BaseConfig: c.BaseClientConfig,
GenerateTraceID: generateTraceID,
SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents),
})
}
if c := cfg.S3; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInBucket)
client, err := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey)
if err != nil {
return nil, fmt.Errorf("error creating S3 client: %w", err)
}
logger.Info("Created S3 client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
GenerateTraceID: generateTraceID,
SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents),
})
}

return clients, nil
}

func jsonMarshalEvents(events []*billing.IncrementalEvent) ([]byte, reporting.SimplifiableError) {
obj := struct {
Events []*billing.IncrementalEvent `json:"events"`
}{Events: events}

return reporting.JSONMarshalBatch(&obj)
}

func generateTraceID() string {
return shortuuid.New()
}

// Returns a function to generate keys for the placement of billing events data into blob storage.
//
// Example: prefixInContainer/year=2021/month=01/day=26/hh:mm:ssZ_{uuid}.ndjson.gz
func newBlobStorageKeyGenerator(prefix string) func() string {
return func() string {
now := time.Now()
id := shortuuid.New()

return fmt.Sprintf("%s/year=%d/month=%02d/day=%02d/%s_%s.ndjson.gz",
prefix,
now.Year(), now.Month(), now.Day(),
now.Format("15:04:05Z"),
id,
)
}
}
33 changes: 6 additions & 27 deletions pkg/agent/billing/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"github.com/prometheus/client_golang/prometheus"

vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/reporting"
)

type PromMetrics struct {
reporting *reporting.EventSinkMetrics

vmsProcessedTotal *prometheus.CounterVec
vmsCurrent *prometheus.GaugeVec
queueSizeCurrent *prometheus.GaugeVec
lastSendDuration *prometheus.GaugeVec
sendErrorsTotal *prometheus.CounterVec
}

func NewPromMetrics() PromMetrics {
return PromMetrics{
reporting: reporting.NewEventSinkMetrics("autoscaling_agent_billing"),

vmsProcessedTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "autoscaling_agent_billing_vms_processed_total",
Expand All @@ -34,36 +36,13 @@ func NewPromMetrics() PromMetrics {
},
[]string{"is_endpoint", "autoscaling_enabled", "phase"},
),
queueSizeCurrent: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "autoscaling_agent_billing_queue_size",
Help: "Size of the billing subsystem's queue of unsent events",
},
[]string{"client"},
),
lastSendDuration: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "autoscaling_agent_billing_last_send_duration_seconds",
Help: "Duration, in seconds, that it took to send the latest set of billing events (or current time if ongoing)",
},
[]string{"client"},
),
sendErrorsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "autoscaling_agent_billing_send_errors_total",
Help: "Total errors from attempting to send billing events",
},
[]string{"client", "cause"},
),
}
}

func (m PromMetrics) MustRegister(reg *prometheus.Registry) {
m.reporting.MustRegister(reg)
reg.MustRegister(m.vmsProcessedTotal)
reg.MustRegister(m.vmsCurrent)
reg.MustRegister(m.queueSizeCurrent)
reg.MustRegister(m.lastSendDuration)
reg.MustRegister(m.sendErrorsTotal)
}

type batchMetrics struct {
Expand Down
Loading
Loading