Skip to content

Commit

Permalink
agent,billing: Move clients and queue into new pkg/reporting
Browse files Browse the repository at this point in the history
This commit adds a new package: pkg/reporting.

The new package contains both the HTTP/S3/Azure Blob client
implementations from pkg/billing, as well as the event queue and sender
implementation from pkg/agent/billing.

The intent is to provide a 'plug and play' infrastructure for
asynchronously sending billing events (among other event types) to a
variety of sources, with the code responsible for generating those
events only being responsible for setting up the clients and queueing
the events.
  • Loading branch information
sharnoff committed Sep 23, 2024
1 parent 1fdccdb commit 9bb7768
Show file tree
Hide file tree
Showing 16 changed files with 762 additions and 576 deletions.
123 changes: 25 additions & 98 deletions pkg/agent/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package billing
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"time"

"go.uber.org/zap"
Expand All @@ -15,7 +13,7 @@ import (
vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/billing"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/reporting"
)

type Config struct {
Expand All @@ -26,33 +24,6 @@ type Config struct {
AccumulateEverySeconds uint `json:"accumulateEverySeconds"`
}

type ClientsConfig struct {
AzureBlob *AzureBlobStorageConfig `json:"azureBlob"`
HTTP *HTTPClientConfig `json:"http"`
S3 *S3ClientConfig `json:"s3"`
}

type AzureBlobStorageConfig struct {
BaseClientConfig
billing.AzureBlobStorageClientConfig
}

type HTTPClientConfig struct {
BaseClientConfig
URL string `json:"url"`
}

type S3ClientConfig struct {
BaseClientConfig
billing.S3ClientConfig
}

type BaseClientConfig struct {
PushEverySeconds uint `json:"pushEverySeconds"`
PushRequestTimeoutSeconds uint `json:"pushRequestTimeoutSeconds"`
MaxBatchSize uint `json:"maxBatchSize"`
}

type metricsState struct {
historical map[metricsKey]vmMetricsHistory
present map[metricsKey]vmMetricsInstant
Expand Down Expand Up @@ -94,60 +65,38 @@ type vmMetricsSeconds struct {

type MetricsCollector struct {
conf *Config
clients []clientInfo
sink *reporting.EventSink[*billing.IncrementalEvent]
metrics PromMetrics
}

func NewMetricsCollector(
ctx context.Context,
parentLogger *zap.Logger,
conf *Config,
metrics PromMetrics,
) (*MetricsCollector, error) {
logger := parentLogger.Named("billing")
mc := &MetricsCollector{
conf: conf,
clients: make([]clientInfo, 0),
}

if c := conf.Clients.AzureBlob; c != nil {
client, err := billing.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig)
if err != nil {
return nil, fmt.Errorf("error creating AzureBlobStorageClient: %w", err)
}
mc.clients = append(mc.clients, clientInfo{
client: client,
name: "azureblob",
config: c.BaseClientConfig,
})
}
if c := conf.Clients.HTTP; c != nil {
mc.clients = append(mc.clients, clientInfo{
client: billing.NewHTTPClient(c.URL, http.DefaultClient),
name: "http",
config: c.BaseClientConfig,
})
}
if c := conf.Clients.S3; c != nil {
client, err := billing.NewS3Client(ctx, c.S3ClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}
logger.Info("Created S3 client", client.LogFields())
mc.clients = append(mc.clients, clientInfo{
client: client,
name: "s3",
config: c.BaseClientConfig,
})
clients, err := createClients(ctx, logger, conf.Clients)
if err != nil {
return nil, err
}

return mc, nil
sink := reporting.NewEventSink(logger, metrics.reporting, clients...)

return &MetricsCollector{
conf: conf,
sink: sink,
metrics: metrics,
}, nil
}

func (mc *MetricsCollector) Run(
ctx context.Context,
logger *zap.Logger,
store VMStoreForNode,
metrics PromMetrics,
) error {
logger = logger.Named("collect")

collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds))
defer collectTicker.Stop()
Expand All @@ -163,29 +112,7 @@ func (mc *MetricsCollector) Run(
pushWindowStart: time.Now(),
}

var queueWriters []eventQueuePusher[*billing.IncrementalEvent]

for _, c := range mc.clients {
qw, queueReader := newEventQueue[*billing.IncrementalEvent](metrics.queueSizeCurrent.WithLabelValues(c.name))
queueWriters = append(queueWriters, qw)

// Start the sender
signalDone, thisThreadFinished := util.NewCondChannelPair()
defer signalDone.Send() //nolint:gocritic // this defer-in-loop is intentional.
sender := eventSender{
clientInfo: c,
metrics: metrics,
queue: queueReader,
collectorFinished: thisThreadFinished,
lastSendDuration: 0,
}
go sender.senderLoop(logger.Named(fmt.Sprintf("send-%s", c.name)))
}

// The rest of this function is to do with collection
logger = logger.Named("collect")

state.collect(logger, store, metrics)
state.collect(logger, store, mc.metrics)

for {
select {
Expand All @@ -196,10 +123,10 @@ func (mc *MetricsCollector) Run(
logger.Panic("Validation check failed", zap.Error(err))
return err
}
state.collect(logger, store, metrics)
state.collect(logger, store, mc.metrics)
case <-accumulateTicker.C:
logger.Info("Creating billing batch")
state.drainEnqueue(logger, mc.conf, billing.GetHostname(), queueWriters)
state.drainEnqueue(logger, mc.conf, billing.GetHostname(), mc.sink)
case <-ctx.Done():
return nil
}
Expand Down Expand Up @@ -332,18 +259,18 @@ func logAddedEvent(logger *zap.Logger, event *billing.IncrementalEvent) *billing
}

// drainEnqueue clears the current history, adding it as events to the queue
func (s *metricsState) drainEnqueue(logger *zap.Logger, conf *Config, hostname string, queues []eventQueuePusher[*billing.IncrementalEvent]) {
func (s *metricsState) drainEnqueue(
logger *zap.Logger,
conf *Config,
hostname string,
sink *reporting.EventSink[*billing.IncrementalEvent],
) {
now := time.Now()

countInBatch := 0
batchSize := 2 * len(s.historical)

// Helper function that adds an event to all queues
enqueue := func(event *billing.IncrementalEvent) {
for _, q := range queues {
q.enqueue(event)
}
}
enqueue := sink.Enqueue

for key, history := range s.historical {
history.finalizeCurrentTimeSlice()
Expand Down
125 changes: 125 additions & 0 deletions pkg/agent/billing/clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package billing

// Management of billing clients

import (
"context"
"fmt"
"net/http"
"time"

"github.com/lithammer/shortuuid"
"go.uber.org/zap"

"github.com/neondatabase/autoscaling/pkg/billing"
"github.com/neondatabase/autoscaling/pkg/reporting"
)

type ClientsConfig struct {
AzureBlob *AzureBlobStorageClientConfig `json:"azureBlob"`
HTTP *HTTPClientConfig `json:"http"`
S3 *S3ClientConfig `json:"s3"`
}

type S3ClientConfig struct {
reporting.BaseClientConfig
reporting.S3ClientConfig
PrefixInBucket string `json:"prefixInBucket"`
}

type AzureBlobStorageClientConfig struct {
reporting.BaseClientConfig
reporting.AzureBlobStorageClientConfig
PrefixInContainer string `json:"prefixInContainer"`
}

type HTTPClientConfig struct {
reporting.BaseClientConfig
URL string `json:"url"`
}

type billingClient = reporting.Client[*billing.IncrementalEvent]

func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) ([]billingClient, error) {
var clients []billingClient

if c := cfg.HTTP; c != nil {
client := reporting.NewHTTPClient(http.DefaultClient, reporting.HTTPClientConfig{
URL: fmt.Sprintf("%s/usage_events", c.URL),
Method: http.MethodPost,
})
logger.Info("Created HTTP client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "http",
Base: client,
BaseConfig: c.BaseClientConfig,
GenerateTraceID: generateTraceID,
SerializeBatch: jsonMarshalEvents, // note: NOT gzipped.
})

}
if c := cfg.AzureBlob; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInContainer)
client, err := reporting.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig, generateKey)
if err != nil {
return nil, fmt.Errorf("error creating Azure Blob Storage client: %w", err)
}
logger.Info("Created Azure Blob Storage client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "azureblob",
Base: client,
BaseConfig: c.BaseClientConfig,
GenerateTraceID: generateTraceID,
SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents),
})
}
if c := cfg.S3; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInBucket)
client, err := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey)
if err != nil {
return nil, fmt.Errorf("error creating S3 client: %w", err)
}
logger.Info("Created S3 client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
GenerateTraceID: generateTraceID,
SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents),
})
}

return clients, nil
}

func jsonMarshalEvents(events []*billing.IncrementalEvent) ([]byte, reporting.SimpleError) {
obj := struct {
Events []*billing.IncrementalEvent `json:"events"`
}{Events: events}

return reporting.JSONMarshalBatch(&obj)
}

func generateTraceID() string {
return shortuuid.New()
}

// Returns a function to generate keys for the placement of billing events data into blob storage.
//
// Example: prefixInContainer/year=2021/month=01/day=26/hh:mm:ssZ_{uuid}.ndjson.gz
func newBlobStorageKeyGenerator(prefix string) func() string {
return func() string {
now := time.Now()
id := shortuuid.New()

return fmt.Sprintf("%s/year=%d/month=%02d/day=%02d/%s_%s.ndjson.gz",
prefix,
now.Year(), now.Month(), now.Day(),
now.Format("15:04:05Z"),
id,
)
}
}
33 changes: 6 additions & 27 deletions pkg/agent/billing/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"github.com/prometheus/client_golang/prometheus"

vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/reporting"
)

type PromMetrics struct {
reporting *reporting.EventSinkMetrics

vmsProcessedTotal *prometheus.CounterVec
vmsCurrent *prometheus.GaugeVec
queueSizeCurrent *prometheus.GaugeVec
lastSendDuration *prometheus.GaugeVec
sendErrorsTotal *prometheus.CounterVec
}

func NewPromMetrics() PromMetrics {
return PromMetrics{
reporting: reporting.NewEventSinkMetrics("autoscaling_agent_billing"),

vmsProcessedTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "autoscaling_agent_billing_vms_processed_total",
Expand All @@ -34,36 +36,13 @@ func NewPromMetrics() PromMetrics {
},
[]string{"is_endpoint", "autoscaling_enabled", "phase"},
),
queueSizeCurrent: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "autoscaling_agent_billing_queue_size",
Help: "Size of the billing subsystem's queue of unsent events",
},
[]string{"client"},
),
lastSendDuration: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "autoscaling_agent_billing_last_send_duration_seconds",
Help: "Duration, in seconds, that it took to send the latest set of billing events (or current time if ongoing)",
},
[]string{"client"},
),
sendErrorsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "autoscaling_agent_billing_send_errors_total",
Help: "Total errors from attempting to send billing events",
},
[]string{"client", "cause"},
),
}
}

func (m PromMetrics) MustRegister(reg *prometheus.Registry) {
m.reporting.MustRegister(reg)
reg.MustRegister(m.vmsProcessedTotal)
reg.MustRegister(m.vmsCurrent)
reg.MustRegister(m.queueSizeCurrent)
reg.MustRegister(m.lastSendDuration)
reg.MustRegister(m.sendErrorsTotal)
}

type batchMetrics struct {
Expand Down
Loading

0 comments on commit 9bb7768

Please sign in to comment.