diff --git a/.github/actions/set-docker-config-dir/action.yml b/.github/actions/set-docker-config-dir/action.yml deleted file mode 100644 index 5aa53a045..000000000 --- a/.github/actions/set-docker-config-dir/action.yml +++ /dev/null @@ -1,36 +0,0 @@ -name: "Set custom docker config directory" -description: "Create a directory for docker config and set DOCKER_CONFIG" - -# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings -runs: - using: "composite" - steps: - - name: Show warning on GitHub-hosted runners - if: runner.environment == 'github-hosted' - shell: bash -euo pipefail {0} - run: | - # Using the following environment variables to find a path to the workflow file - # ${GITHUB_WORKFLOW_REF} - octocat/hello-world/.github/workflows/my-workflow.yml@refs/heads/my_branch - # ${GITHUB_REPOSITORY} - octocat/hello-world - # ${GITHUB_REF} - refs/heads/my_branch - # From https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/variables - - filename_with_ref=${GITHUB_WORKFLOW_REF#"$GITHUB_REPOSITORY/"} - filename=${filename_with_ref%"@$GITHUB_REF"} - - # https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/workflow-commands-for-github-actions#setting-a-warning-message - title='Unnecessary usage of `.github/actions/set-docker-config-dir`' - message='No need to use `.github/actions/set-docker-config-dir` action on GitHub-hosted runners' - echo "::warning file=${filename},title=${title}::${message}" - - - uses: pyTooling/Actions/with-post-step@74afc5a42a17a046c90c68cb5cfa627e5c6c5b6b # v1.0.7 - env: - DOCKER_CONFIG: .docker-custom-${{ github.run_id }}-${{ github.run_attempt }} - with: - main: | - mkdir -p "${DOCKER_CONFIG}" - echo DOCKER_CONFIG=${DOCKER_CONFIG} | tee -a ${GITHUB_ENV} - post: | - if [ -d "${DOCKER_CONFIG}" ]; then - rm -r "${DOCKER_CONFIG}" - fi diff --git a/.github/workflows/build-images.yaml b/.github/workflows/build-images.yaml index c8f131854..50ff027da 100644 --- a/.github/workflows/build-images.yaml +++ b/.github/workflows/build-images.yaml @@ -137,7 +137,7 @@ jobs: echo "info=$(git describe --tags --long --dirty)" >> $GITHUB_OUTPUT - name: set custom docker config directory - uses: ./.github/actions/set-docker-config-dir + uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193 - uses: docker/setup-buildx-action@v3 with: diff --git a/.github/workflows/build-test-vm.yaml b/.github/workflows/build-test-vm.yaml index 2c7f991be..9fb28e0da 100644 --- a/.github/workflows/build-test-vm.yaml +++ b/.github/workflows/build-test-vm.yaml @@ -75,7 +75,7 @@ jobs: retention-days: 2 - name: set custom docker config directory - uses: ./.github/actions/set-docker-config-dir + uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193 - name: login to docker hub uses: docker/login-action@v3 diff --git a/.github/workflows/check-ca-builds.yaml b/.github/workflows/check-ca-builds.yaml index 76f17f0a3..1b8d71bbc 100644 --- a/.github/workflows/check-ca-builds.yaml +++ b/.github/workflows/check-ca-builds.yaml @@ -22,7 +22,7 @@ jobs: - uses: actions/checkout@v4 - name: set custom docker config directory - uses: ./.github/actions/set-docker-config-dir + uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193 - uses: docker/setup-buildx-action@v3 - name: Login to Docker cache registry diff --git a/.github/workflows/e2e-test.yaml b/.github/workflows/e2e-test.yaml index b040bdd50..8d50ae5ac 100644 --- a/.github/workflows/e2e-test.yaml +++ b/.github/workflows/e2e-test.yaml @@ -132,7 +132,7 @@ jobs: retention-days: 2 # minimum is 1 day; 0 is default. These are only used temporarily. - name: set custom docker config directory - uses: ./.github/actions/set-docker-config-dir + uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193 - uses: docker/login-action@v3 with: diff --git a/.github/workflows/vm-example.yaml b/.github/workflows/vm-example.yaml index 394a69ddf..0b2387a22 100644 --- a/.github/workflows/vm-example.yaml +++ b/.github/workflows/vm-example.yaml @@ -3,6 +3,9 @@ name: vm-example on: schedule: - cron: '42 4 * * 2' # run once a week + pull_request: + paths: + - ".github/workflows/vm-example.yaml" workflow_dispatch: # adds ability to run this manually jobs: @@ -19,6 +22,9 @@ jobs: go-version-file: 'go.mod' timeout-minutes: 10 + - name: build daemon + run: make docker-build-daemon + - name: build vm-builder run: make bin/vm-builder diff --git a/.github/workflows/vm-kernel.yaml b/.github/workflows/vm-kernel.yaml index 67d780fdb..90d8ae3a3 100644 --- a/.github/workflows/vm-kernel.yaml +++ b/.github/workflows/vm-kernel.yaml @@ -138,7 +138,7 @@ jobs: uses: actions/checkout@v4 - name: set custom docker config directory - uses: ./.github/actions/set-docker-config-dir + uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193 - name: docker - setup buildx uses: docker/setup-buildx-action@v3 diff --git a/.golangci.yml b/.golangci.yml index ab66df909..13be222da 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -25,6 +25,7 @@ linters: - staticcheck # some rules from staticcheck.io - typecheck # typechecks code, like the compiler - unused # checks for unused constants/variables/functions/types + - gofumpt # Formatter. # explicitly enabled: - asciicheck # all identifiers are ASCII diff --git a/Makefile b/Makefile index acb1b7c8d..cbbff7dd0 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ PG16_DISK_TEST_IMG ?= pg16-disk-test:dev GOARCH ?= $(shell go env GOARCH) GOOS ?= $(shell go env GOOS) -# The target architecture for linux kernel. Possible values: amd64 or arm64. +# The target architecture for linux kernel. Possible values: amd64 or arm64. # Any other supported by linux kernel architecture could be added by introducing new build step into neonvm/hack/kernel/Dockerfile.kernel-builder KERNEL_TARGET_ARCH ?= amd64 @@ -33,6 +33,8 @@ endif # https://github.com/neondatabase/autoscaling/pull/130#issuecomment-1496276620 export GOFLAGS=-buildvcs=false +GOFUMPT_VERSION ?= v0.7.0 + # Setting SHELL to bash allows bash commands to be executed by recipes. # Options are set to exit when a recipe line exits non-zero or a piped command fails. SHELL = /usr/bin/env bash -o pipefail @@ -108,7 +110,7 @@ generate: ## Generate boilerplate DeepCopy methods, manifests, and Go client .PHONY: fmt fmt: ## Run go fmt against code. - go fmt ./... + go run mvdan.cc/gofumpt@${GOFUMPT_VERSION} -w . .PHONY: vet vet: ## Run go vet against code. @@ -119,7 +121,7 @@ vet: ## Run go vet against code. TESTARGS ?= ./... .PHONY: test -test: fmt vet envtest ## Run tests. +test: vet envtest ## Run tests. # chmodding KUBEBUILDER_ASSETS dir to make it deletable by owner, # otherwise it fails with actions/checkout on self-hosted GitHub runners # ref: https://github.com/kubernetes-sigs/controller-runtime/pull/2245 @@ -132,16 +134,19 @@ test: fmt vet envtest ## Run tests. ##@ Build .PHONY: build -build: fmt vet bin/vm-builder ## Build all neonvm binaries. - GOOS=linux go build -o bin/controller neonvm/main.go - GOOS=linux go build -o bin/vxlan-controller neonvm/tools/vxlan/controller/main.go - GOOS=linux go build -o bin/runner neonvm/runner/*.go +build: vet bin/vm-builder ## Build all neonvm binaries. + GOOS=linux go build -o bin/controller neonvm-controller/cmd/main.go + GOOS=linux go build -o bin/vxlan-controller neonvm-vxlan-controller/cmd/main.go + GOOS=linux go build -o bin/runner neonvm-runner/cmd/main.go + GOOS=linux go build -o bin/daemon neonvm-daemon/cmd/main.go + GOOS=linux go build -o bin/autoscaler-agent autoscaler-agent/cmd/main.go + GOOS=linux go build -o bin/scheduler autoscale-scheduler/cmd/main.go .PHONY: bin/vm-builder bin/vm-builder: ## Build vm-builder binary. - GOOS=linux CGO_ENABLED=0 go build -o bin/vm-builder -ldflags "-X main.Version=${GIT_INFO} -X main.NeonvmDaemonImage=${IMG_DAEMON}" vm-builder/main.go + GOOS=linux CGO_ENABLED=0 go build -o bin/vm-builder -ldflags "-X main.Version=${GIT_INFO} -X main.NeonvmDaemonImage=${IMG_DAEMON}" vm-builder/main.go .PHONY: run -run: fmt vet ## Run a controller from your host. +run: vet ## Run a controller from your host. go run ./neonvm/main.go .PHONY: lint diff --git a/autoscale-scheduler/deployment.yaml b/autoscale-scheduler/deployment.yaml index 409002c59..d4f7dc9e2 100644 --- a/autoscale-scheduler/deployment.yaml +++ b/autoscale-scheduler/deployment.yaml @@ -63,3 +63,13 @@ spec: - name: plugin-config-volume configMap: name: scheduler-plugin-config + + tolerations: + # Add explicit (short) tolerations for node failure, because otherwise the default of 5m + # will be used, which is unacceptably long for us. + - key: node.kubernetes.io/not-ready + tolerationSeconds: 30 + effect: NoExecute + - key: node.kubernetes.io/unreachable + tolerationSeconds: 30 + effect: NoExecute diff --git a/cluster-autoscaler/ca.branch b/cluster-autoscaler/ca.branch index 0a98806c6..c607662c7 100644 --- a/cluster-autoscaler/ca.branch +++ b/cluster-autoscaler/ca.branch @@ -1 +1 @@ -cluster-autoscaler-release-1.29 +cluster-autoscaler-release-1.29 \ No newline at end of file diff --git a/cluster-autoscaler/ca.commit b/cluster-autoscaler/ca.commit index f12c734f0..4b8ff2613 100644 --- a/cluster-autoscaler/ca.commit +++ b/cluster-autoscaler/ca.commit @@ -1 +1 @@ -d4bbc686ac02a77a6ad1362fe7bbda387e8f074a +d4bbc686ac02a77a6ad1362fe7bbda387e8f074a \ No newline at end of file diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index d8a060a41..d0bb9d95d 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -180,7 +180,7 @@ func getLines(input []byte, commentMarker []byte) [][]byte { lines := bytes.Split(input, []byte("\n")) var output [][]byte for _, currentLine := range lines { - var commentIndex = bytes.Index(currentLine, commentMarker) + commentIndex := bytes.Index(currentLine, commentMarker) if commentIndex == -1 { output = append(output, currentLine) } else { @@ -335,7 +335,7 @@ func createISO9660runtime( } } - outputFile, err := os.OpenFile(diskPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) + outputFile, err := os.OpenFile(diskPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644) if err != nil { return err } @@ -534,7 +534,7 @@ func createISO9660FromPath(logger *zap.Logger, diskName string, diskPath string, } } - outputFile, err := os.OpenFile(diskPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) + outputFile, err := os.OpenFile(diskPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644) if err != nil { return err } @@ -1733,7 +1733,7 @@ func defaultNetwork(logger *zap.Logger, cidr string, ports []vmv1.Port) (mac.MAC // Adding VM's IP address to the /etc/hosts, so we can access it easily from // the pod. This is particularly useful for ssh into the VM from the runner // pod. - f, err := os.OpenFile("/etc/hosts", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + f, err := os.OpenFile("/etc/hosts", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return nil, err } diff --git a/neonvm-vxlan-controller/cmd/main.go b/neonvm-vxlan-controller/cmd/main.go index 50823a1c2..1e88080f9 100644 --- a/neonvm-vxlan-controller/cmd/main.go +++ b/neonvm-vxlan-controller/cmd/main.go @@ -31,9 +31,7 @@ const ( extraNetCidr = "10.100.0.0/16" ) -var ( - deleteIfaces = flag.Bool("delete", false, `delete VXLAN interfaces`) -) +var deleteIfaces = flag.Bool("delete", false, `delete VXLAN interfaces`) func main() { flag.Parse() @@ -192,7 +190,6 @@ func createVxlanInterface(name string, vxlanID int, ownIP string, bridgeName str } func updateFDB(vxlanName string, nodeIPs []string, ownIP string) error { - broadcastFdbMac, _ := net.ParseMAC("00:00:00:00:00:00") // get vxlan interface details @@ -248,7 +245,6 @@ func deleteLink(name string) error { } func upsertIptablesRules() error { - // manage iptables ipt, err := iptables.New(iptables.IPFamily(iptables.ProtocolIPv4), iptables.Timeout(5)) if err != nil { diff --git a/neonvm/apis/neonvm/v1/webhook_suite_test.go b/neonvm/apis/neonvm/v1/webhook_suite_test.go index 7cbc15b8b..93834fbdc 100644 --- a/neonvm/apis/neonvm/v1/webhook_suite_test.go +++ b/neonvm/apis/neonvm/v1/webhook_suite_test.go @@ -44,11 +44,13 @@ import ( // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -var cfg *rest.Config -var k8sClient client.Client -var testEnv *envtest.Environment -var ctx context.Context -var cancel context.CancelFunc +var ( + cfg *rest.Config + k8sClient client.Client + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc +) func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -127,7 +129,6 @@ var _ = BeforeSuite(func() { conn.Close() return nil }).Should(Succeed()) - }) var _ = AfterSuite(func() { diff --git a/pkg/agent/billing/billing.go b/pkg/agent/billing/billing.go index ac21b251f..434030e53 100644 --- a/pkg/agent/billing/billing.go +++ b/pkg/agent/billing/billing.go @@ -3,9 +3,7 @@ package billing import ( "context" "errors" - "fmt" "math" - "net/http" "time" "go.uber.org/zap" @@ -15,7 +13,7 @@ import ( vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/billing" - "github.com/neondatabase/autoscaling/pkg/util" + "github.com/neondatabase/autoscaling/pkg/reporting" ) type Config struct { @@ -26,33 +24,6 @@ type Config struct { AccumulateEverySeconds uint `json:"accumulateEverySeconds"` } -type ClientsConfig struct { - AzureBlob *AzureBlobStorageConfig `json:"azureBlob"` - HTTP *HTTPClientConfig `json:"http"` - S3 *S3ClientConfig `json:"s3"` -} - -type AzureBlobStorageConfig struct { - BaseClientConfig - billing.AzureBlobStorageClientConfig -} - -type HTTPClientConfig struct { - BaseClientConfig - URL string `json:"url"` -} - -type S3ClientConfig struct { - BaseClientConfig - billing.S3ClientConfig -} - -type BaseClientConfig struct { - PushEverySeconds uint `json:"pushEverySeconds"` - PushRequestTimeoutSeconds uint `json:"pushRequestTimeoutSeconds"` - MaxBatchSize uint `json:"maxBatchSize"` -} - type metricsState struct { historical map[metricsKey]vmMetricsHistory present map[metricsKey]vmMetricsInstant @@ -94,60 +65,38 @@ type vmMetricsSeconds struct { type MetricsCollector struct { conf *Config - clients []clientInfo + sink *reporting.EventSink[*billing.IncrementalEvent] + metrics PromMetrics } func NewMetricsCollector( ctx context.Context, parentLogger *zap.Logger, conf *Config, + metrics PromMetrics, ) (*MetricsCollector, error) { logger := parentLogger.Named("billing") - mc := &MetricsCollector{ - conf: conf, - clients: make([]clientInfo, 0), - } - if c := conf.Clients.AzureBlob; c != nil { - client, err := billing.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig) - if err != nil { - return nil, fmt.Errorf("error creating AzureBlobStorageClient: %w", err) - } - mc.clients = append(mc.clients, clientInfo{ - client: client, - name: "azureblob", - config: c.BaseClientConfig, - }) - } - if c := conf.Clients.HTTP; c != nil { - mc.clients = append(mc.clients, clientInfo{ - client: billing.NewHTTPClient(c.URL, http.DefaultClient), - name: "http", - config: c.BaseClientConfig, - }) - } - if c := conf.Clients.S3; c != nil { - client, err := billing.NewS3Client(ctx, c.S3ClientConfig) - if err != nil { - return nil, fmt.Errorf("failed to create S3 client: %w", err) - } - logger.Info("Created S3 client", client.LogFields()) - mc.clients = append(mc.clients, clientInfo{ - client: client, - name: "s3", - config: c.BaseClientConfig, - }) + clients, err := createClients(ctx, logger, conf.Clients) + if err != nil { + return nil, err } - return mc, nil + sink := reporting.NewEventSink(logger, metrics.reporting, clients...) + + return &MetricsCollector{ + conf: conf, + sink: sink, + metrics: metrics, + }, nil } func (mc *MetricsCollector) Run( ctx context.Context, logger *zap.Logger, store VMStoreForNode, - metrics PromMetrics, ) error { + logger = logger.Named("collect") collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds)) defer collectTicker.Stop() @@ -163,29 +112,7 @@ func (mc *MetricsCollector) Run( pushWindowStart: time.Now(), } - var queueWriters []eventQueuePusher[*billing.IncrementalEvent] - - for _, c := range mc.clients { - qw, queueReader := newEventQueue[*billing.IncrementalEvent](metrics.queueSizeCurrent.WithLabelValues(c.name)) - queueWriters = append(queueWriters, qw) - - // Start the sender - signalDone, thisThreadFinished := util.NewCondChannelPair() - defer signalDone.Send() //nolint:gocritic // this defer-in-loop is intentional. - sender := eventSender{ - clientInfo: c, - metrics: metrics, - queue: queueReader, - collectorFinished: thisThreadFinished, - lastSendDuration: 0, - } - go sender.senderLoop(logger.Named(fmt.Sprintf("send-%s", c.name))) - } - - // The rest of this function is to do with collection - logger = logger.Named("collect") - - state.collect(logger, store, metrics) + state.collect(logger, store, mc.metrics) for { select { @@ -196,10 +123,10 @@ func (mc *MetricsCollector) Run( logger.Panic("Validation check failed", zap.Error(err)) return err } - state.collect(logger, store, metrics) + state.collect(logger, store, mc.metrics) case <-accumulateTicker.C: logger.Info("Creating billing batch") - state.drainEnqueue(logger, mc.conf, billing.GetHostname(), queueWriters) + state.drainEnqueue(logger, mc.conf, billing.GetHostname(), mc.sink) case <-ctx.Done(): return nil } @@ -332,18 +259,18 @@ func logAddedEvent(logger *zap.Logger, event *billing.IncrementalEvent) *billing } // drainEnqueue clears the current history, adding it as events to the queue -func (s *metricsState) drainEnqueue(logger *zap.Logger, conf *Config, hostname string, queues []eventQueuePusher[*billing.IncrementalEvent]) { +func (s *metricsState) drainEnqueue( + logger *zap.Logger, + conf *Config, + hostname string, + sink *reporting.EventSink[*billing.IncrementalEvent], +) { now := time.Now() countInBatch := 0 batchSize := 2 * len(s.historical) - // Helper function that adds an event to all queues - enqueue := func(event *billing.IncrementalEvent) { - for _, q := range queues { - q.enqueue(event) - } - } + enqueue := sink.Enqueue for key, history := range s.historical { history.finalizeCurrentTimeSlice() diff --git a/pkg/agent/billing/clients.go b/pkg/agent/billing/clients.go new file mode 100644 index 000000000..1a243e637 --- /dev/null +++ b/pkg/agent/billing/clients.go @@ -0,0 +1,118 @@ +package billing + +// Management of billing clients + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/lithammer/shortuuid" + "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/billing" + "github.com/neondatabase/autoscaling/pkg/reporting" +) + +type ClientsConfig struct { + AzureBlob *AzureBlobStorageClientConfig `json:"azureBlob"` + HTTP *HTTPClientConfig `json:"http"` + S3 *S3ClientConfig `json:"s3"` +} + +type S3ClientConfig struct { + reporting.BaseClientConfig + reporting.S3ClientConfig + PrefixInBucket string `json:"prefixInBucket"` +} + +type AzureBlobStorageClientConfig struct { + reporting.BaseClientConfig + reporting.AzureBlobStorageClientConfig + PrefixInContainer string `json:"prefixInContainer"` +} + +type HTTPClientConfig struct { + reporting.BaseClientConfig + URL string `json:"url"` +} + +type billingClient = reporting.Client[*billing.IncrementalEvent] + +func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) ([]billingClient, error) { + var clients []billingClient + + if c := cfg.HTTP; c != nil { + client := reporting.NewHTTPClient(http.DefaultClient, reporting.HTTPClientConfig{ + URL: fmt.Sprintf("%s/usage_events", c.URL), + Method: http.MethodPost, + }) + logger.Info("Created HTTP client for billing events", zap.Any("config", c)) + + clients = append(clients, billingClient{ + Name: "http", + Base: client, + BaseConfig: c.BaseClientConfig, + SerializeBatch: jsonMarshalEvents, // note: NOT gzipped. + }) + + } + if c := cfg.AzureBlob; c != nil { + generateKey := newBlobStorageKeyGenerator(c.PrefixInContainer) + client, err := reporting.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig, generateKey) + if err != nil { + return nil, fmt.Errorf("error creating Azure Blob Storage client: %w", err) + } + logger.Info("Created Azure Blob Storage client for billing events", zap.Any("config", c)) + + clients = append(clients, billingClient{ + Name: "azureblob", + Base: client, + BaseConfig: c.BaseClientConfig, + SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents), + }) + } + if c := cfg.S3; c != nil { + generateKey := newBlobStorageKeyGenerator(c.PrefixInBucket) + client, err := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey) + if err != nil { + return nil, fmt.Errorf("error creating S3 client: %w", err) + } + logger.Info("Created S3 client for billing events", zap.Any("config", c)) + + clients = append(clients, billingClient{ + Name: "s3", + Base: client, + BaseConfig: c.BaseClientConfig, + SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents), + }) + } + + return clients, nil +} + +func jsonMarshalEvents(events []*billing.IncrementalEvent) ([]byte, reporting.SimplifiableError) { + obj := struct { + Events []*billing.IncrementalEvent `json:"events"` + }{Events: events} + + return reporting.JSONMarshalBatch(&obj) +} + +// Returns a function to generate keys for the placement of billing events data into blob storage. +// +// Example: prefixInContainer/year=2021/month=01/day=26/hh:mm:ssZ_{uuid}.ndjson.gz +func newBlobStorageKeyGenerator(prefix string) func() string { + return func() string { + now := time.Now() + id := shortuuid.New() + + return fmt.Sprintf("%s/year=%d/month=%02d/day=%02d/%s_%s.ndjson.gz", + prefix, + now.Year(), now.Month(), now.Day(), + now.Format("15:04:05Z"), + id, + ) + } +} diff --git a/pkg/agent/billing/indexedstore.go b/pkg/agent/billing/indexedstore.go index 563dcea29..a385ce976 100644 --- a/pkg/agent/billing/indexedstore.go +++ b/pkg/agent/billing/indexedstore.go @@ -37,10 +37,12 @@ func (i *VMNodeIndex) Add(vm *vmapi.VirtualMachine) { i.forNode[vm.UID] = vm } } + func (i *VMNodeIndex) Update(oldVM, newVM *vmapi.VirtualMachine) { i.Delete(oldVM) i.Add(newVM) } + func (i *VMNodeIndex) Delete(vm *vmapi.VirtualMachine) { // note: delete is a no-op if the key isn't present. delete(i.forNode, vm.UID) diff --git a/pkg/agent/billing/prommetrics.go b/pkg/agent/billing/prommetrics.go index fbaf89c85..104e045fe 100644 --- a/pkg/agent/billing/prommetrics.go +++ b/pkg/agent/billing/prommetrics.go @@ -8,18 +8,20 @@ import ( "github.com/prometheus/client_golang/prometheus" vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" + "github.com/neondatabase/autoscaling/pkg/reporting" ) type PromMetrics struct { + reporting *reporting.EventSinkMetrics + vmsProcessedTotal *prometheus.CounterVec vmsCurrent *prometheus.GaugeVec - queueSizeCurrent *prometheus.GaugeVec - lastSendDuration *prometheus.GaugeVec - sendErrorsTotal *prometheus.CounterVec } func NewPromMetrics() PromMetrics { return PromMetrics{ + reporting: reporting.NewEventSinkMetrics("autoscaling_agent_billing"), + vmsProcessedTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "autoscaling_agent_billing_vms_processed_total", @@ -34,36 +36,13 @@ func NewPromMetrics() PromMetrics { }, []string{"is_endpoint", "autoscaling_enabled", "phase"}, ), - queueSizeCurrent: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "autoscaling_agent_billing_queue_size", - Help: "Size of the billing subsystem's queue of unsent events", - }, - []string{"client"}, - ), - lastSendDuration: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "autoscaling_agent_billing_last_send_duration_seconds", - Help: "Duration, in seconds, that it took to send the latest set of billing events (or current time if ongoing)", - }, - []string{"client"}, - ), - sendErrorsTotal: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "autoscaling_agent_billing_send_errors_total", - Help: "Total errors from attempting to send billing events", - }, - []string{"client", "cause"}, - ), } } func (m PromMetrics) MustRegister(reg *prometheus.Registry) { + m.reporting.MustRegister(reg) reg.MustRegister(m.vmsProcessedTotal) reg.MustRegister(m.vmsCurrent) - reg.MustRegister(m.queueSizeCurrent) - reg.MustRegister(m.lastSendDuration) - reg.MustRegister(m.sendErrorsTotal) } type batchMetrics struct { @@ -90,8 +69,10 @@ func (m PromMetrics) forBatch() batchMetrics { } } -type isEndpointFlag bool -type autoscalingEnabledFlag bool +type ( + isEndpointFlag bool + autoscalingEnabledFlag bool +) func (b batchMetrics) inc(isEndpoint isEndpointFlag, autoscalingEnabled autoscalingEnabledFlag, phase vmapi.VmPhase) { key := batchMetricsLabels{ diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 5f3ae9cc0..254e7f6ae 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -9,6 +9,7 @@ import ( "github.com/neondatabase/autoscaling/pkg/agent/billing" "github.com/neondatabase/autoscaling/pkg/api" + "github.com/neondatabase/autoscaling/pkg/reporting" ) type Config struct { @@ -167,7 +168,7 @@ func (c *Config) validate() error { zeroTmpl = "field %q cannot be zero" ) - validateBaseBillingConfig := func(cfg *billing.BaseClientConfig, key string) { + validateBaseReportingConfig := func(cfg *reporting.BaseClientConfig, key string) { erc.Whenf(ec, cfg.PushEverySeconds == 0, zeroTmpl, fmt.Sprintf("%s.pushEverySeconds", key)) erc.Whenf(ec, cfg.PushRequestTimeoutSeconds == 0, zeroTmpl, fmt.Sprintf("%s.pushRequestTimeoutSeconds", key)) erc.Whenf(ec, cfg.MaxBatchSize == 0, zeroTmpl, fmt.Sprintf("%s.maxBatchSize", key)) @@ -178,16 +179,16 @@ func (c *Config) validate() error { erc.Whenf(ec, c.Billing.CollectEverySeconds == 0, zeroTmpl, ".billing.collectEverySeconds") erc.Whenf(ec, c.Billing.AccumulateEverySeconds == 0, zeroTmpl, ".billing.accumulateEverySeconds") if c.Billing.Clients.AzureBlob != nil { - validateBaseBillingConfig(&c.Billing.Clients.AzureBlob.BaseClientConfig, ".billing.clients.azure") + validateBaseReportingConfig(&c.Billing.Clients.AzureBlob.BaseClientConfig, ".billing.clients.azure") erc.Whenf(ec, c.Billing.Clients.AzureBlob.Endpoint == "", emptyTmpl, ".billing.clients.azure.endpoint") erc.Whenf(ec, c.Billing.Clients.AzureBlob.Container == "", emptyTmpl, ".billing.clients.azure.container") } if c.Billing.Clients.HTTP != nil { - validateBaseBillingConfig(&c.Billing.Clients.HTTP.BaseClientConfig, ".billing.clients.http") + validateBaseReportingConfig(&c.Billing.Clients.HTTP.BaseClientConfig, ".billing.clients.http") erc.Whenf(ec, c.Billing.Clients.HTTP.URL == "", emptyTmpl, ".billing.clients.http.url") } if c.Billing.Clients.S3 != nil { - validateBaseBillingConfig(&c.Billing.Clients.S3.BaseClientConfig, "billing.clients.s3") + validateBaseReportingConfig(&c.Billing.Clients.S3.BaseClientConfig, "billing.clients.s3") erc.Whenf(ec, c.Billing.Clients.S3.Bucket == "", emptyTmpl, ".billing.clients.s3.bucket") erc.Whenf(ec, c.Billing.Clients.S3.Region == "", emptyTmpl, ".billing.clients.s3.region") erc.Whenf(ec, c.Billing.Clients.S3.PrefixInBucket == "", emptyTmpl, ".billing.clients.s3.prefixInBucket") diff --git a/pkg/agent/core/revsource/revsource.go b/pkg/agent/core/revsource/revsource.go index d28e5d0b8..646493828 100644 --- a/pkg/agent/core/revsource/revsource.go +++ b/pkg/agent/core/revsource/revsource.go @@ -64,7 +64,7 @@ func (c *RevisionSource) Observe(moment time.Time, rev vmv1.Revision) error { } idx := rev.Value - c.offset - if idx > int64(len(c.measurements)) { + if idx >= int64(len(c.measurements)) { return errors.New("revision is in the future") } diff --git a/pkg/agent/core/state_test.go b/pkg/agent/core/state_test.go index d975de870..bee432816 100644 --- a/pkg/agent/core/state_test.go +++ b/pkg/agent/core/state_test.go @@ -1495,7 +1495,6 @@ func TestDownscalePivotBack(t *testing.T) { a.Do(state.UpdateSystemMetrics, newMetrics) a.Call(getDesiredResources, state, clock.Now()). Equals(resForCU(2)) - } } diff --git a/pkg/agent/core/testhelpers/construct.go b/pkg/agent/core/testhelpers/construct.go index a97e8a10b..d50c976e5 100644 --- a/pkg/agent/core/testhelpers/construct.go +++ b/pkg/agent/core/testhelpers/construct.go @@ -96,9 +96,11 @@ func CreateVmInfo(config InitialVmInfoConfig, opts ...VmInfoOpt) api.VmInfo { return vm } -type coreConfigModifier func(*core.Config) -type vmInfoConfigModifier func(*InitialVmInfoConfig) -type vmInfoModifier func(InitialVmInfoConfig, *api.VmInfo) +type ( + coreConfigModifier func(*core.Config) + vmInfoConfigModifier func(*InitialVmInfoConfig) + vmInfoModifier func(InitialVmInfoConfig, *api.VmInfo) +) var ( _ VmInfoOpt = vmInfoConfigModifier(nil) diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index 176f854f7..38e07cec2 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -75,14 +75,14 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { } } - mc, err := billing.NewMetricsCollector(ctx, logger, &r.Config.Billing) + mc, err := billing.NewMetricsCollector(ctx, logger, &r.Config.Billing, metrics) if err != nil { return fmt.Errorf("error creating billing metrics collector: %w", err) } tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) tg.Go("billing", func(logger *zap.Logger) error { - return mc.Run(tg.Ctx(), logger, storeForNode, metrics) + return mc.Run(tg.Ctx(), logger, storeForNode) }) tg.Go("main-loop", func(logger *zap.Logger) error { logger.Info("Entering main loop") diff --git a/pkg/agent/prommetrics.go b/pkg/agent/prommetrics.go index 01ee5fab3..dac29887b 100644 --- a/pkg/agent/prommetrics.go +++ b/pkg/agent/prommetrics.go @@ -74,8 +74,10 @@ const ( // Copied bucket values from controller runtime latency metric. We can // adjust them in the future if needed. -var buckets = []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, - 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60} +var buckets = []float64{ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, + 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60, +} func makeGlobalMetrics() (GlobalMetrics, *prometheus.Registry) { reg := prometheus.NewRegistry() diff --git a/pkg/agent/runner.go b/pkg/agent/runner.go index 7d0778922..b198d17c0 100644 --- a/pkg/agent/runner.go +++ b/pkg/agent/runner.go @@ -666,7 +666,6 @@ func (r *Runner) doNeonVMRequest( // Also relevant: _, err = r.global.vmClient.NeonvmV1().VirtualMachines(r.vmName.Namespace). Patch(requestCtx, r.vmName.Name, ktypes.JSONPatchType, patchPayload, metav1.PatchOptions{}) - if err != nil { errMsg := util.RootError(err).Error() // Some error messages contain the object name. We could try to filter them all out, but diff --git a/pkg/api/VERSIONING.md b/pkg/api/VERSIONING.md deleted file mode 100644 index e32008142..000000000 --- a/pkg/api/VERSIONING.md +++ /dev/null @@ -1,135 +0,0 @@ -# API version compatibility - -This file exists to make it easy to answer the following questions: - -1. Which protocol versions does each component support? -2. Which releases of a component support a given protocol version? - -The table below should provide the necessary information. For each release, it gives the range of -supported protocol versions by each component. The topmost line - "Current" - refers to the latest -commit in this repository, possibly unreleased. - -## agent<->monitor protocol - -Note: For v0.17.0 and below, the autoscaler-agent additionally had support for the vm-informant by -first checking if the /register endpoint returned a 404. - -| Release | autoscaler-agent | VM monitor | -|---------|------------------|------------| -| _Current_ | v1.0 only | v1.0 only | -| v0.28.0 | v1.0 only | v1.0 only | -| v0.27.0 | v1.0 only | v1.0 only | -| v0.26.0 | v1.0 only | v1.0 only | -| v0.25.0 | v1.0 only | v1.0 only | -| v0.24.0 | v1.0 only | v1.0 only | -| v0.23.0 | v1.0 only | v1.0 only | -| v0.22.0 | v1.0 only | v1.0 only | -| v0.21.0 | v1.0 only | v1.0 only | -| v0.20.0 | v1.0 only | v1.0 only | -| v0.19.0 | v1.0 only | v1.0 only | -| v0.18.0 | v1.0 only | v1.0 only | -| v0.17.0 | v1.0 only | v1.0 only | -| v0.16.0 | v1.0 only | v1.0 only | -| v0.15.0 | **v1.0** only | **v1.0** only | - -## agent<->scheduler plugin protocol - -Note: Components v0.1.7 and below did not have a versioned protocol between the agent and scheduler -plugin. We've marked those as protocol version v0.0. Scheduler plugin v0.1.7 implicitly supports -v1.0 because the only change from v0.0 to v1.0 is having the scheduler plugin check the version -number. - -| Release | autoscaler-agent | Scheduler plugin | -|---------|------------------|------------------| -| _Current_ | v5.0 only | v3.0-v5.0 | -| v0.28.0 | **v5.0** only | **v3.0-v5.0** | -| v0.27.0 | v4.0 only | v3.0-v4.0 | -| v0.26.0 | v4.0 only | **v3.0-v4.0** | -| v0.25.0 | v4.0 only | v1.0-v4.0 | -| v0.24.0 | v4.0 only | v1.0-v4.0 | -| v0.23.0 | **v4.0 only** | **v1.0-v4.0** | -| v0.22.0 | **v3.0 only** | **v1.0-v3.0** | -| v0.21.0 | v2.1 only | v1.0-v2.1 | -| v0.20.0 | **v2.1 only** | **v1.0-v2.1** | -| v0.19.0 | v2.0 only | v1.0-v2.0 | -| v0.18.0 | v2.0 only | v1.0-v2.0 | -| v0.17.0 | v2.0 only | v1.0-v2.0 | -| v0.16.0 | v2.0 only | v1.0-v2.0 | -| v0.15.0 | v2.0 only | v1.0-v2.0 | -| v0.14.2 | v2.0 only | v1.0-v2.0 | -| v0.14.1 | v2.0 only | v1.0-v2.0 | -| v0.14.0 | v2.0 only | v1.0-v2.0 | -| v0.13.3 | v2.0 only | v1.0-v2.0 | -| v0.13.2 | v2.0 only | v1.0-v2.0 | -| v0.13.1 | v2.0 only | v1.0-v2.0 | -| v0.13.0 | v2.0 only | v1.0-v2.0 | -| v0.12.2 | v2.0 only | v1.0-v2.0 | -| v0.12.1 | v2.0 only | v1.0-v2.0 | -| v0.12.0 | v2.0 only | v1.0-v2.0 | -| v0.11.0 | v2.0 only | v1.0-v2.0 | -| v0.10.0 | v2.0 only | v1.0-v2.0 | -| v0.9.0 | v2.0 only | v1.0-v2.0 | -| v0.8.0 | v2.0 only | v1.0-v2.0 | -| v0.7.2 | v2.0 only | v1.0-v2.0 | -| v0.7.1 | v2.0 only | v1.0-v2.0 | -| v0.7.0 | **v2.0** only | **v1.0-v2.0** | -| v0.6.0 | v1.1 only | v1.0-v1.1 | -| v0.5.2 | v1.1 only | v1.0-v1.1 | -| v0.5.1 | v1.1 only | v1.0-v1.1 | -| v0.5.0 | v1.1 only | v1.0-v1.1 | -| v0.1.17 | v1.1 only | v1.0-v1.1 | -| v0.1.16 | v1.1 only | v1.0-v1.1 | -| v0.1.15 | v1.1 only | v1.0-v1.1 | -| v0.1.14 | v1.1 only | v1.0-v1.1 | -| v0.1.13 | v1.1 only | v1.0-v1.1 | -| v0.1.12 | v1.1 only | v1.0-v1.1 | -| v0.1.11 | v1.1 only | v1.0-v1.1 | -| v0.1.10 | v1.1 only | v1.0-v1.1 | -| v0.1.9 | **v1.1** only | **v1.0-v1.1** | -| v0.1.8 | **v1.0** only | **v1.0** only | -| v0.1.7 | v0.0 only | **v0.0-v1.0** | -| v0.1.6 | v0.0 only | v0.0 only | -| v0.1.5 | v0.0 only | v0.0 only | -| v0.1.4 | v0.0 only | v0.0 only | -| v0.1.3 | v0.0 only | v0.0 only | -| 0.1.2 | v0.0 only | v0.0 only | -| 0.1.1 | v0.0 only | v0.0 only | -| 0.1.0 | **v0.0** only | **v0.0** only | - -## controller<->runner protocol - -Note: Components v0.6.0 and below did not have a versioned protocol between the controller and the runner. -| Release | controller | runner | -|---------|------------|--------| -| _Current_ | 1 | 1 | -| v0.28.0 | **1** | 1 | -| v0.27.0 | 0 - 1 | 1 | -| v0.26.0 | 0 - 1 | 1 | -| v0.25.0 | 0 - 1 | 1 | -| v0.24.0 | 0 - 1 | 1 | -| v0.23.0 | 0 - 1 | 1 | -| v0.22.0 | 0 - 1 | 1 | -| v0.21.0 | 0 - 1 | 1 | -| v0.20.0 | 0 - 1 | 1 | -| v0.19.0 | 0 - 1 | 1 | -| v0.18.0 | 0 - 1 | 1 | -| v0.17.0 | 0 - 1 | 1 | -| v0.16.0 | 0 - 1 | 1 | -| v0.15.0 | 0 - 1 | 1 | -| v0.14.2 | 0 - 1 | 1 | -| v0.14.1 | 0 - 1 | 1 | -| v0.14.0 | 0 - 1 | 1 | -| v0.13.3 | 0 - 1 | 1 | -| v0.13.2 | 0 - 1 | 1 | -| v0.13.1 | 0 - 1 | 1 | -| v0.13.0 | 0 - 1 | 1 | -| v0.12.2 | 0 - 1 | 1 | -| v0.12.1 | 0 - 1 | 1 | -| v0.12.0 | 0 - 1 | 1 | -| v0.11.0 | 0 - 1 | 1 | -| v0.10.0 | 0 - 1 | 1 | -| v0.9.0 | 0 - 1 | 1 | -| v0.8.0 | 0 - 1 | 1 | -| v0.7.2 | 0 - 1 | 1 | -| v0.7.1 | 0 - 1 | 1 | -| v0.7.0 | 0 - 1 | 1 | diff --git a/pkg/api/types.go b/pkg/api/types.go index d71570def..296673fe4 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -24,8 +24,6 @@ import ( // repository containing this code. Names follow semver, although this does not necessarily // guarantee support - for example, the plugin may only support a single version, even though others // may appear to be semver-compatible. -// -// Version compatibility is documented in the neighboring file VERSIONING.md. type PluginProtoVersion uint32 const ( @@ -174,7 +172,8 @@ func (v PluginProtoVersion) IncludesExtendedMetrics() bool { return v < PluginProtoV5_0 } -// AgentRequest is the type of message sent from an autoscaler-agent to the scheduler plugin +// AgentRequest is the type of message sent from an autoscaler-agent to the scheduler plugin on +// behalf of a Pod on the agent's node. // // All AgentRequests expect a PluginResponse. type AgentRequest struct { @@ -182,7 +181,8 @@ type AgentRequest struct { // // If the scheduler does not support this version, then it will respond with a 400 status. ProtoVersion PluginProtoVersion `json:"protoVersion"` - // Pod is the namespaced name of the pod making the request + // Pod is the namespaced name of the Pod that the autoscaler-agent is making the request on + // behalf of. Pod util.NamespacedName `json:"pod"` // ComputeUnit gives the value of the agent's configured compute unit to use for the VM. // @@ -595,8 +595,6 @@ func SerializeMonitorMessage(content any, id uint64) ([]byte, error) { // Each version of the agent<->monitor protocol is named independently from releases of the // repository containing this code. Names follow semver, although this does not necessarily // guarantee support - for example, the monitor may only support versions above v1.1. -// -// Version compatibility is documented in the neighboring file VERSIONING.md. type MonitorProtoVersion uint32 const ( diff --git a/pkg/api/vminfo.go b/pkg/api/vminfo.go index e50c5f303..274fb174d 100644 --- a/pkg/api/vminfo.go +++ b/pkg/api/vminfo.go @@ -93,7 +93,6 @@ func NewVmMemInfo(memSlots vmapi.MemorySlots, memSlotSize resource.Quantity) VmM Use: uint16(memSlots.Use), SlotSize: Bytes(memSlotSize.Value()), } - } // VmConfig stores the autoscaling-specific "extra" configuration derived from labels and diff --git a/pkg/billing/azureblobstoragebilling.go b/pkg/billing/azureblobstoragebilling.go deleted file mode 100644 index d1b38ec51..000000000 --- a/pkg/billing/azureblobstoragebilling.go +++ /dev/null @@ -1,134 +0,0 @@ -package billing - -import ( - "context" - "fmt" - - "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" - "github.com/Azure/azure-sdk-for-go/sdk/azidentity" - "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -type AzureAuthSharedKey struct { - AccountName string `json:"accountName"` - AccountKey string `json:"accountKey"` -} - -type AzureBlobStorageClientConfig struct { - // In Azure a Container is close to a bucket in AWS S3 - Container string `json:"container"` - // Files will be created with name starting with PrefixInContainer - PrefixInContainer string `json:"prefixInContainer"` - // Example Endpoint: "https://MYSTORAGEACCOUNT.blob.core.windows.net/" - Endpoint string `json:"endpoint"` - - // - // Unexported attributes follow this comment. - // - - // Use generateKey for tests. - // Otherwise, keep empty. - generateKey func() string - // Use getClient for tests. - // Otherwise keep empty. - getClient func() (*azblob.Client, error) -} - -type AzureError struct { - Err error -} - -func (e AzureError) Error() string { - return fmt.Sprintf("Azure Blob error: %s", e.Err.Error()) -} - -func (e AzureError) Unwrap() error { - return e.Err -} - -type AzureClient struct { - cfg AzureBlobStorageClientConfig - c *azblob.Client -} - -func (c AzureClient) LogFields() zap.Field { - return zap.Inline(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { - enc.AddString("container", c.cfg.Container) - enc.AddString("prefixInContainer", c.cfg.PrefixInContainer) - enc.AddString("endpoint", c.cfg.Endpoint) - return nil - })) -} - -func (c AzureClient) generateKey() string { - return c.cfg.generateKey() -} - -func (c AzureClient) send(ctx context.Context, payload []byte, _ TraceID) error { - payload, err := compress(payload) - if err != nil { - return err - } - _, err = c.c.UploadBuffer(ctx, c.cfg.Container, c.generateKey(), payload, - &azblob.UploadBufferOptions{}, //nolint:exhaustruct // It's part of Azure SDK - ) - return handleAzureError(err) -} - -func defaultGenerateKey(cfg AzureBlobStorageClientConfig) func() string { - return func() string { - return keyTemplate(cfg.PrefixInContainer) - } -} - -func defaultGetClient(cfg AzureBlobStorageClientConfig) func() (*azblob.Client, error) { - return func() (*azblob.Client, error) { - //nolint:exhaustruct // It's part of Azure SDK - clientOptions := &azblob.ClientOptions{ - ClientOptions: azcore.ClientOptions{ - Telemetry: policy.TelemetryOptions{ApplicationID: "neon-autoscaler"}, - }, - } - - credential, err := azidentity.NewDefaultAzureCredential(nil) - if err != nil { - return nil, err - } - client, err := azblob.NewClient(cfg.Endpoint, credential, clientOptions) - if err != nil { - return nil, &AzureError{err} - } - return client, nil - } -} - -func NewAzureBlobStorageClient(cfg AzureBlobStorageClientConfig) (*AzureClient, error) { - var client *azblob.Client - - if cfg.generateKey == nil { - cfg.generateKey = defaultGenerateKey(cfg) - } - - if cfg.getClient == nil { - cfg.getClient = defaultGetClient(cfg) - } - client, err := cfg.getClient() - if err != nil { - return nil, err - } - - return &AzureClient{ - cfg: cfg, - c: client, - }, nil -} - -func handleAzureError(err error) error { - if err == nil { - return nil - } - return AzureError{err} -} diff --git a/pkg/billing/client.go b/pkg/billing/client.go index 5ba8ce922..96d36825a 100644 --- a/pkg/billing/client.go +++ b/pkg/billing/client.go @@ -1,21 +1,10 @@ package billing import ( - "bytes" - "compress/gzip" - "context" - "encoding/json" "fmt" "math/rand" - "net/http" "os" "time" - - awsconfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/lithammer/shortuuid" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) var hostname string @@ -35,168 +24,6 @@ func GetHostname() string { return hostname } -type Client interface { - LogFields() zap.Field - send(ctx context.Context, payload []byte, traceID TraceID) error -} - -type TraceID string - -func GenerateTraceID() TraceID { - return TraceID(shortuuid.New()) -} - -type HTTPClient struct { - URL string - httpc *http.Client -} - -func NewHTTPClient(url string, c *http.Client) HTTPClient { - return HTTPClient{URL: fmt.Sprintf("%s/usage_events", url), httpc: c} -} - -func (c HTTPClient) send(ctx context.Context, payload []byte, traceID TraceID) error { - r, err := http.NewRequestWithContext(ctx, http.MethodPost, c.URL, bytes.NewReader(payload)) - if err != nil { - return RequestError{Err: err} - } - r.Header.Set("content-type", "application/json") - r.Header.Set("x-trace-id", string(traceID)) - - resp, err := c.httpc.Do(r) - if err != nil { - return RequestError{Err: err} - } - defer resp.Body.Close() - - // theoretically if wanted/needed, we should use an http handler that - // does the retrying, to avoid writing that logic here. - if resp.StatusCode != http.StatusOK { - return UnexpectedStatusCodeError{StatusCode: resp.StatusCode} - } - - return nil -} - -func (c HTTPClient) LogFields() zap.Field { - return zap.String("url", c.URL) -} - -type S3ClientConfig struct { - Bucket string `json:"bucket"` - Region string `json:"region"` - PrefixInBucket string `json:"prefixInBucket"` - Endpoint string `json:"endpoint"` -} - -type S3Client struct { - cfg S3ClientConfig - client *s3.Client -} - -type S3Error struct { - Err error -} - -func (e S3Error) Error() string { - return fmt.Sprintf("S3 error: %s", e.Err.Error()) -} - -func (e S3Error) Unwrap() error { - return e.Err -} - -func NewS3Client(ctx context.Context, cfg S3ClientConfig) (*S3Client, error) { - // Timeout in case we have hidden IO inside config creation - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - s3Config, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.Region)) - - if err != nil { - return nil, S3Error{Err: err} - } - - client := s3.NewFromConfig(s3Config, func(o *s3.Options) { - if cfg.Endpoint != "" { - o.BaseEndpoint = &cfg.Endpoint - } - o.UsePathStyle = true // required for minio - }) - - return &S3Client{ - cfg: cfg, - client: client, - }, nil -} - -// Example: prefixInContainer/year=2021/month=01/day=26/hh:mm:ssZ_{uuid}.ndjson.gz -func keyTemplate(prefix string) string { - now := time.Now() - id := shortuuid.New() - - return fmt.Sprintf("%s/year=%d/month=%02d/day=%02d/%s_%s.ndjson.gz", - prefix, - now.Year(), now.Month(), now.Day(), - now.Format("15:04:05Z"), - id, - ) -} - -func (c S3Client) generateKey() string { - return keyTemplate(c.cfg.PrefixInBucket) -} - -func (c S3Client) LogFields() zap.Field { - return zap.Inline(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { - enc.AddString("bucket", c.cfg.Bucket) - enc.AddString("prefixInBucket", c.cfg.PrefixInBucket) - enc.AddString("region", c.cfg.Region) - enc.AddString("endpoint", c.cfg.Endpoint) - return nil - })) -} - -func compress(payload []byte) ([]byte, error) { - buf := bytes.Buffer{} - - gzW := gzip.NewWriter(&buf) - _, err := gzW.Write(payload) - if err != nil { - return nil, err - } - - err = gzW.Close() // Have to close it before reading the buffer - if err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func (c S3Client) send(ctx context.Context, payload []byte, _ TraceID) error { - // Source of truth for the storage format: - // https://github.com/neondatabase/cloud/issues/11199#issuecomment-1992549672 - - key := c.generateKey() - payload, err := compress(payload) - if err != nil { - return S3Error{Err: err} - } - - r := bytes.NewReader(payload) - _, err = c.client.PutObject(ctx, &s3.PutObjectInput{ //nolint:exhaustruct // AWS SDK - Bucket: &c.cfg.Bucket, - Key: &key, - Body: r, - }) - - if err != nil { - return S3Error{Err: err} - } - - return nil -} - // Enrich sets the event's Type and IdempotencyKey fields, so that users of this API don't need to // manually set them func Enrich[E Event](now time.Time, hostname string, countInBatch, batchSize int, event E) E { @@ -213,54 +40,3 @@ func Enrich[E Event](now time.Time, hostname string, countInBatch, batchSize int return event } - -// Send attempts to push the events to the remote endpoint. -// -// On failure, the error is guaranteed to be one of: JSONError, RequestError, or -// UnexpectedStatusCodeError. -func Send[E Event](ctx context.Context, client Client, traceID TraceID, events []E) error { - if len(events) == 0 { - return nil - } - - payload, err := json.Marshal(struct { - Events []E `json:"events"` - }{Events: events}) - if err != nil { - return JSONError{Err: err} - } - - return client.send(ctx, payload, traceID) -} - -type JSONError struct { - Err error -} - -func (e JSONError) Error() string { - return fmt.Sprintf("Error marshaling events: %s", e.Err.Error()) -} - -func (e JSONError) Unwrap() error { - return e.Err -} - -type RequestError struct { - Err error -} - -func (e RequestError) Error() string { - return fmt.Sprintf("Error making request: %s", e.Err.Error()) -} - -func (e RequestError) Unwrap() error { - return e.Err -} - -type UnexpectedStatusCodeError struct { - StatusCode int -} - -func (e UnexpectedStatusCodeError) Error() string { - return fmt.Sprintf("Unexpected HTTP status code %d", e.StatusCode) -} diff --git a/pkg/neonvm/controllers/functests/suite_test.go b/pkg/neonvm/controllers/functests/suite_test.go index a8d339996..87a3c3ba8 100644 --- a/pkg/neonvm/controllers/functests/suite_test.go +++ b/pkg/neonvm/controllers/functests/suite_test.go @@ -36,9 +36,11 @@ import ( // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -var cfg *rest.Config -var k8sClient client.Client -var testEnv *envtest.Environment +var ( + cfg *rest.Config + k8sClient client.Client + testEnv *envtest.Environment +) func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -69,7 +71,6 @@ var _ = BeforeSuite(func() { k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) - }) var _ = AfterSuite(func() { diff --git a/pkg/neonvm/controllers/functests/vm_controller_test.go b/pkg/neonvm/controllers/functests/vm_controller_test.go index 8b3987b6c..47ab7f5cc 100644 --- a/pkg/neonvm/controllers/functests/vm_controller_test.go +++ b/pkg/neonvm/controllers/functests/vm_controller_test.go @@ -36,7 +36,6 @@ import ( var _ = Describe("VirtualMachine controller", func() { Context("VirtualMachine controller test", func() { - const VirtualMachineName = "test-virtualmachine" ctx := context.Background() diff --git a/pkg/neonvm/controllers/metrics.go b/pkg/neonvm/controllers/metrics.go index 9ec978c59..083f3a7b9 100644 --- a/pkg/neonvm/controllers/metrics.go +++ b/pkg/neonvm/controllers/metrics.go @@ -33,8 +33,10 @@ const OutcomeLabel = "outcome" func MakeReconcilerMetrics() ReconcilerMetrics { // Copied bucket values from controller runtime latency metric. We can // adjust them in the future if needed. - buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, - 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60} + buckets := []float64{ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, + 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60, + } m := ReconcilerMetrics{ failing: util.RegisterMetric(metrics.Registry, prometheus.NewGaugeVec( diff --git a/pkg/neonvm/controllers/vm_controller.go b/pkg/neonvm/controllers/vm_controller.go index 7a77668b6..92058f2be 100644 --- a/pkg/neonvm/controllers/vm_controller.go +++ b/pkg/neonvm/controllers/vm_controller.go @@ -311,6 +311,42 @@ func (r *VMReconciler) updateVMStatusMemory( } } +func (r *VMReconciler) acquireOverlayIP(ctx context.Context, vm *vmv1.VirtualMachine) error { + if vm.Spec.ExtraNetwork == nil || !vm.Spec.ExtraNetwork.Enable || len(vm.Status.ExtraNetIP) != 0 { + // If the VM has extra network disabled or already has an IP, do nothing. + return nil + } + + log := log.FromContext(ctx) + + // Create IPAM object + nadName, err := nadIpamName() + if err != nil { + return err + } + nadNamespace, err := nadIpamNamespace() + if err != nil { + return err + } + ipam, err := ipam.New(ctx, nadName, nadNamespace) + if err != nil { + log.Error(err, "failed to create IPAM") + return err + } + defer ipam.Close() + ip, err := ipam.AcquireIP(ctx, vm.Name, vm.Namespace) + if err != nil { + log.Error(err, "fail to acquire IP") + return err + } + message := fmt.Sprintf("Acquired IP %s for overlay network interface", ip.String()) + log.Info(message) + vm.Status.ExtraNetIP = ip.IP.String() + vm.Status.ExtraNetMask = fmt.Sprintf("%d.%d.%d.%d", ip.Mask[0], ip.Mask[1], ip.Mask[2], ip.Mask[3]) + r.Recorder.Event(vm, "Normal", "OverlayNet", message) + return nil +} + func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) error { log := log.FromContext(ctx) @@ -342,35 +378,8 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) switch vm.Status.Phase { case "": - // Acquire overlay IP address - if vm.Spec.ExtraNetwork != nil && - vm.Spec.ExtraNetwork.Enable && - len(vm.Status.ExtraNetIP) == 0 { - // Create IPAM object - nadName, err := nadIpamName() - if err != nil { - return err - } - nadNamespace, err := nadIpamNamespace() - if err != nil { - return err - } - ipam, err := ipam.New(ctx, nadName, nadNamespace) - if err != nil { - log.Error(err, "failed to create IPAM") - return err - } - defer ipam.Close() - ip, err := ipam.AcquireIP(ctx, vm.Name, vm.Namespace) - if err != nil { - log.Error(err, "fail to acquire IP") - return err - } - message := fmt.Sprintf("Acquired IP %s for overlay network interface", ip.String()) - log.Info(message) - vm.Status.ExtraNetIP = ip.IP.String() - vm.Status.ExtraNetMask = fmt.Sprintf("%d.%d.%d.%d", ip.Mask[0], ip.Mask[1], ip.Mask[2], ip.Mask[3]) - r.Recorder.Event(vm, "Normal", "OverlayNet", message) + if err := r.acquireOverlayIP(ctx, vm); err != nil { + return err } // VirtualMachine just created, change Phase to "Pending" vm.Status.Phase = vmv1.VmPending @@ -457,10 +466,12 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) vm.Status.PodIP = vmRunner.Status.PodIP vm.Status.Phase = vmv1.VmRunning meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeAvailableVirtualMachine, + metav1.Condition{ + Type: typeAvailableVirtualMachine, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) created successfully", vm.Status.PodName, vm.Name)}) + Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) created successfully", vm.Status.PodName, vm.Name), + }) { // Calculating VM startup latency metrics now := time.Now() @@ -475,24 +486,21 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) case runnerSucceeded: vm.Status.Phase = vmv1.VmSucceeded meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeAvailableVirtualMachine, + metav1.Condition{ + Type: typeAvailableVirtualMachine, Status: metav1.ConditionFalse, Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) succeeded", vm.Status.PodName, vm.Name)}) + Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) succeeded", vm.Status.PodName, vm.Name), + }) case runnerFailed: vm.Status.Phase = vmv1.VmFailed meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeDegradedVirtualMachine, + metav1.Condition{ + Type: typeDegradedVirtualMachine, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) failed", vm.Status.PodName, vm.Name)}) - case runnerUnknown: - vm.Status.Phase = vmv1.VmPending - meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeAvailableVirtualMachine, - Status: metav1.ConditionUnknown, - Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) in Unknown phase", vm.Status.PodName, vm.Name)}) + Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) failed", vm.Status.PodName, vm.Name), + }) default: // do nothing } @@ -507,10 +515,12 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) vm.Status.PodName)) vm.Status.Phase = vmv1.VmFailed meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeDegradedVirtualMachine, + metav1.Condition{ + Type: typeDegradedVirtualMachine, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) not found", vm.Status.PodName, vm.Name)}) + Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) not found", vm.Status.PodName, vm.Name), + }) } else if err != nil { log.Error(err, "Failed to get runner Pod") return err @@ -595,24 +605,21 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) case runnerSucceeded: vm.Status.Phase = vmv1.VmSucceeded meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeAvailableVirtualMachine, + metav1.Condition{ + Type: typeAvailableVirtualMachine, Status: metav1.ConditionFalse, Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) succeeded", vm.Status.PodName, vm.Name)}) + Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) succeeded", vm.Status.PodName, vm.Name), + }) case runnerFailed: vm.Status.Phase = vmv1.VmFailed meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeDegradedVirtualMachine, + metav1.Condition{ + Type: typeDegradedVirtualMachine, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) failed", vm.Status.PodName, vm.Name)}) - case runnerUnknown: - vm.Status.Phase = vmv1.VmPending - meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeAvailableVirtualMachine, - Status: metav1.ConditionUnknown, - Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) in Unknown phase", vm.Status.PodName, vm.Name)}) + Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) failed", vm.Status.PodName, vm.Name), + }) default: // do nothing } @@ -628,10 +635,12 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) vm.Status.PodName)) vm.Status.Phase = vmv1.VmFailed meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeDegradedVirtualMachine, + metav1.Condition{ + Type: typeDegradedVirtualMachine, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) not found", vm.Status.PodName, vm.Name)}) + Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) not found", vm.Status.PodName, vm.Name), + }) } else if err != nil { log.Error(err, "Failed to get runner Pod") return err @@ -648,26 +657,22 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) case runnerSucceeded: vm.Status.Phase = vmv1.VmSucceeded meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeAvailableVirtualMachine, + metav1.Condition{ + Type: typeAvailableVirtualMachine, Status: metav1.ConditionFalse, Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) succeeded", vm.Status.PodName, vm.Name)}) + Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) succeeded", vm.Status.PodName, vm.Name), + }) return nil case runnerFailed: vm.Status.Phase = vmv1.VmFailed meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeDegradedVirtualMachine, + metav1.Condition{ + Type: typeDegradedVirtualMachine, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) failed", vm.Status.PodName, vm.Name)}) - return nil - case runnerUnknown: - vm.Status.Phase = vmv1.VmPending - meta.SetStatusCondition(&vm.Status.Conditions, - metav1.Condition{Type: typeAvailableVirtualMachine, - Status: metav1.ConditionUnknown, - Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) in Unknown phase", vm.Status.PodName, vm.Name)}) + Message: fmt.Sprintf("Pod (%s) for VirtualMachine (%s) failed", vm.Status.PodName, vm.Name), + }) return nil default: // do nothing @@ -926,7 +931,6 @@ func (r *VMReconciler) doDIMMSlotsScaling(ctx context.Context, vm *vmv1.VirtualM type runnerStatusKind string const ( - runnerUnknown runnerStatusKind = "Unknown" runnerPending runnerStatusKind = "Pending" runnerRunning runnerStatusKind = "Running" runnerFailed runnerStatusKind = "Failed" @@ -953,8 +957,6 @@ func runnerStatus(pod *corev1.Pod) runnerStatusKind { return runnerSucceeded case corev1.PodFailed: return runnerFailed - case corev1.PodUnknown: - return runnerUnknown case corev1.PodRunning: return runnerRunning default: @@ -1330,7 +1332,7 @@ func getRunnerCgroup(ctx context.Context, vm *vmv1.VirtualMachine) (*api.VCPUCgr // imageForVirtualMachine gets the Operand image which is managed by this controller // from the VM_RUNNER_IMAGE environment variable defined in the config/manager/manager.yaml func imageForVmRunner() (string, error) { - var imageEnvVar = "VM_RUNNER_IMAGE" + imageEnvVar := "VM_RUNNER_IMAGE" image, found := os.LookupEnv(imageEnvVar) if !found { return "", fmt.Errorf("unable to find %s environment variable with the image", imageEnvVar) @@ -1555,7 +1557,7 @@ func podSpec( { Key: "ssh-privatekey", Path: "id_ed25519", - Mode: lo.ToPtr[int32](0600), + Mode: lo.ToPtr[int32](0o600), }, }, }, @@ -1570,7 +1572,7 @@ func podSpec( { Key: "ssh-publickey", Path: "authorized_keys", - Mode: lo.ToPtr[int32](0644), + Mode: lo.ToPtr[int32](0o644), }, }, }, diff --git a/pkg/neonvm/controllers/vm_qmp_queries.go b/pkg/neonvm/controllers/vm_qmp_queries.go index 3181d88da..54823b20b 100644 --- a/pkg/neonvm/controllers/vm_qmp_queries.go +++ b/pkg/neonvm/controllers/vm_qmp_queries.go @@ -801,7 +801,6 @@ func QmpGetMemorySize(ip string, port int32) (*resource.Quantity, error) { } func QmpStartMigration(virtualmachine *vmv1.VirtualMachine, virtualmachinemigration *vmv1.VirtualMachineMigration) error { - // QMP port port := virtualmachine.Spec.QMP diff --git a/pkg/neonvm/controllers/vmmigration_controller.go b/pkg/neonvm/controllers/vmmigration_controller.go index 2a7160170..0066cdd5a 100644 --- a/pkg/neonvm/controllers/vmmigration_controller.go +++ b/pkg/neonvm/controllers/vmmigration_controller.go @@ -155,10 +155,12 @@ func (r *VirtualMachineMigrationReconciler) Reconcile(ctx context.Context, req c message := fmt.Sprintf("VM (%s) not found", migration.Spec.VmName) r.Recorder.Event(migration, "Warning", "Failed", message) meta.SetStatusCondition(&migration.Status.Conditions, - metav1.Condition{Type: typeDegradedVirtualMachineMigration, + metav1.Condition{ + Type: typeDegradedVirtualMachineMigration, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: message}) + Message: message, + }) migration.Status.Phase = vmv1.VmmFailed return r.updateMigrationStatus(ctx, migration) } @@ -349,10 +351,12 @@ func (r *VirtualMachineMigrationReconciler) Reconcile(ctx context.Context, req c log.Info(message) r.Recorder.Event(migration, "Normal", "Started", message) meta.SetStatusCondition(&migration.Status.Conditions, - metav1.Condition{Type: typeAvailableVirtualMachineMigration, + metav1.Condition{ + Type: typeAvailableVirtualMachineMigration, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: message}) + Message: message, + }) // finally update migration phase to Running migration.Status.Phase = vmv1.VmmRunning return r.updateMigrationStatus(ctx, migration) @@ -363,10 +367,12 @@ func (r *VirtualMachineMigrationReconciler) Reconcile(ctx context.Context, req c log.Info(message) r.Recorder.Event(migration, "Warning", "Failed", message) meta.SetStatusCondition(&migration.Status.Conditions, - metav1.Condition{Type: typeDegradedVirtualMachineMigration, + metav1.Condition{ + Type: typeDegradedVirtualMachineMigration, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: message}) + Message: message, + }) migration.Status.Phase = vmv1.VmmFailed return r.updateMigrationStatus(ctx, migration) case runnerFailed: @@ -374,23 +380,14 @@ func (r *VirtualMachineMigrationReconciler) Reconcile(ctx context.Context, req c log.Info(message) r.Recorder.Event(migration, "Warning", "Failed", message) meta.SetStatusCondition(&migration.Status.Conditions, - metav1.Condition{Type: typeDegradedVirtualMachineMigration, + metav1.Condition{ + Type: typeDegradedVirtualMachineMigration, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: message}) + Message: message, + }) migration.Status.Phase = vmv1.VmmFailed return r.updateMigrationStatus(ctx, migration) - case runnerUnknown: - message := fmt.Sprintf("Target Pod (%s) in Unknown phase", targetRunner.Name) - log.Info(message) - r.Recorder.Event(migration, "Warning", "Unknown", message) - meta.SetStatusCondition(&migration.Status.Conditions, - metav1.Condition{Type: typeAvailableVirtualMachineMigration, - Status: metav1.ConditionUnknown, - Reason: "Reconciling", - Message: message}) - migration.Status.Phase = vmv1.VmmPending - return r.updateMigrationStatus(ctx, migration) default: // not sure what to do, so try rqueue return ctrl.Result{RequeueAfter: time.Second}, nil @@ -405,10 +402,12 @@ func (r *VirtualMachineMigrationReconciler) Reconcile(ctx context.Context, req c message := fmt.Sprintf("Target Pod (%s) disappeared", migration.Status.TargetPodName) r.Recorder.Event(migration, "Error", "NotFound", message) meta.SetStatusCondition(&migration.Status.Conditions, - metav1.Condition{Type: typeDegradedVirtualMachineMigration, + metav1.Condition{ + Type: typeDegradedVirtualMachineMigration, Status: metav1.ConditionTrue, Reason: "Reconciling", - Message: message}) + Message: message, + }) migration.Status.Phase = vmv1.VmmFailed return r.updateMigrationStatus(ctx, migration) } else if err != nil { diff --git a/pkg/neonvm/ipam/allocate.go b/pkg/neonvm/ipam/allocate.go index 4d1d39882..46079ddd9 100644 --- a/pkg/neonvm/ipam/allocate.go +++ b/pkg/neonvm/ipam/allocate.go @@ -17,7 +17,6 @@ func doAcquire( vmName string, vmNamespace string, ) (net.IPNet, []whereaboutstypes.IPReservation, error) { - // reduce whereabouts logging whereaboutslogging.SetLogLevel("error") @@ -48,7 +47,6 @@ func doRelease( vmName string, vmNamespace string, ) (net.IPNet, []whereaboutstypes.IPReservation, error) { - // reduce whereabouts logging whereaboutslogging.SetLogLevel("error") diff --git a/pkg/neonvm/ipam/demo/ipam.go b/pkg/neonvm/ipam/demo/ipam.go index bbb8677f2..7d556c090 100644 --- a/pkg/neonvm/ipam/demo/ipam.go +++ b/pkg/neonvm/ipam/demo/ipam.go @@ -27,7 +27,6 @@ var ( ) func main() { - opts := zap.Options{ //nolint:exhaustruct // typical options struct; not all fields expected to be filled. Development: true, StacktraceLevel: zapcore.Level(zapcore.PanicLevel), @@ -90,5 +89,4 @@ func main() { time.Sleep(time.Millisecond * 200) } wg.Wait() - } diff --git a/pkg/neonvm/ipam/ipam.go b/pkg/neonvm/ipam/ipam.go index 4e60e08dd..06a7e7a0d 100644 --- a/pkg/neonvm/ipam/ipam.go +++ b/pkg/neonvm/ipam/ipam.go @@ -68,7 +68,6 @@ func (i *IPAM) ReleaseIP(ctx context.Context, vmName string, vmNamespace string) // New returns a new IPAM object with ipam config and k8s/crd clients func New(ctx context.Context, nadName string, nadNamespace string) (*IPAM, error) { - // get Kubernetes client config cfg, err := config.GetConfig() if err != nil { @@ -171,7 +170,6 @@ func LoadFromNad(nadConfig string, nadNamespace string) (*IPAMConfig, error) { // Performing IPAM actions with Leader Election to avoid duplicates func (i *IPAM) acquireORrelease(ctx context.Context, vmName string, vmNamespace string, action int) (net.IPNet, error) { - var ip net.IPNet var err error var ipamerr error diff --git a/pkg/plugin/dumpstate.go b/pkg/plugin/dumpstate.go index 69ba3c9ca..504fd81ef 100644 --- a/pkg/plugin/dumpstate.go +++ b/pkg/plugin/dumpstate.go @@ -214,7 +214,6 @@ func (s *nodeState) dump() nodeStateDump { } func (s *podState) dump() podStateDump { - var vm *vmPodState if s.vm != nil { vm = lo.ToPtr(s.vm.dump()) diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index bf1b85cd3..40d3f9ce8 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -22,8 +22,10 @@ import ( "github.com/neondatabase/autoscaling/pkg/util/watch" ) -const Name = "AutoscaleEnforcer" -const LabelPluginCreatedMigration = "autoscaling.neon.tech/created-by-scheduler" +const ( + Name = "AutoscaleEnforcer" + LabelPluginCreatedMigration = "autoscaling.neon.tech/created-by-scheduler" +) // AutoscaleEnforcer is the scheduler plugin to coordinate autoscaling type AutoscaleEnforcer struct { @@ -40,16 +42,20 @@ type AutoscaleEnforcer struct { } // abbreviations, because these types are pretty verbose -type IndexedVMStore = watch.IndexedStore[vmapi.VirtualMachine, *watch.NameIndex[vmapi.VirtualMachine]] -type IndexedNodeStore = watch.IndexedStore[corev1.Node, *watch.FlatNameIndex[corev1.Node]] +type ( + IndexedVMStore = watch.IndexedStore[vmapi.VirtualMachine, *watch.NameIndex[vmapi.VirtualMachine]] + IndexedNodeStore = watch.IndexedStore[corev1.Node, *watch.FlatNameIndex[corev1.Node]] +) // Compile-time checks that AutoscaleEnforcer actually implements the interfaces we want it to -var _ framework.Plugin = (*AutoscaleEnforcer)(nil) -var _ framework.PreFilterPlugin = (*AutoscaleEnforcer)(nil) -var _ framework.PostFilterPlugin = (*AutoscaleEnforcer)(nil) -var _ framework.FilterPlugin = (*AutoscaleEnforcer)(nil) -var _ framework.ScorePlugin = (*AutoscaleEnforcer)(nil) -var _ framework.ReservePlugin = (*AutoscaleEnforcer)(nil) +var ( + _ framework.Plugin = (*AutoscaleEnforcer)(nil) + _ framework.PreFilterPlugin = (*AutoscaleEnforcer)(nil) + _ framework.PostFilterPlugin = (*AutoscaleEnforcer)(nil) + _ framework.FilterPlugin = (*AutoscaleEnforcer)(nil) + _ framework.ScorePlugin = (*AutoscaleEnforcer)(nil) + _ framework.ReservePlugin = (*AutoscaleEnforcer)(nil) +) func NewAutoscaleEnforcerPlugin(ctx context.Context, logger *zap.Logger, config *Config) func(context.Context, runtime.Object, framework.Handle) (framework.Plugin, error) { return func(_ctx context.Context, obj runtime.Object, h framework.Handle) (framework.Plugin, error) { diff --git a/pkg/plugin/run.go b/pkg/plugin/run.go index cf1277fd0..4a319a867 100644 --- a/pkg/plugin/run.go +++ b/pkg/plugin/run.go @@ -24,8 +24,6 @@ const ( ) // The scheduler plugin currently supports v3.0 to v5.0 of the agent<->scheduler plugin protocol. -// -// If you update either of these values, make sure to also update VERSIONING.md. const ( MinPluginProtocolVersion api.PluginProtoVersion = api.PluginProtoV3_0 MaxPluginProtocolVersion api.PluginProtoVersion = api.PluginProtoV5_0 diff --git a/pkg/plugin/state.go b/pkg/plugin/state.go index 0e8d369df..941318df5 100644 --- a/pkg/plugin/state.go +++ b/pkg/plugin/state.go @@ -698,7 +698,6 @@ func (e *AutoscaleEnforcer) speculativeReserve( includeBuffer bool, accept func(verdict verdictSet, overBudget bool) bool, ) (ok bool, _ verdictSet) { - // Construct the speculative state of the pod // // We'll pass this into (resourceTransitioner).handleReserve(), but only commit the changes if diff --git a/pkg/reporting/README.md b/pkg/reporting/README.md new file mode 100644 index 000000000..e46cbfcb1 --- /dev/null +++ b/pkg/reporting/README.md @@ -0,0 +1,5 @@ +# reporting + +The autoscaler-agent reports multiple types of data (billing data, scaling events) in multiple ways +(HTTP, S3, Azure Blob), so `reporting` is the abstraction allowing us to deduplicate code between +them. diff --git a/pkg/reporting/client.go b/pkg/reporting/client.go new file mode 100644 index 000000000..0324a40da --- /dev/null +++ b/pkg/reporting/client.go @@ -0,0 +1,162 @@ +package reporting + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "fmt" + + "go.uber.org/zap" +) + +type Client[E any] struct { + Name string + Base BaseClient + BaseConfig BaseClientConfig + SerializeBatch func(events []E) ([]byte, SimplifiableError) +} + +// BaseClient is the shared lower-level interface to send the processed data somewhere. +// +// It's split into the client itself, intended to be used as a kind of persistent object, and a +// separate ClientRequest object, intended to be used only for the lifetime of a single request. +// +// See S3Client, AzureBlobClient, and HTTPClient. +type BaseClient interface { + NewRequest() ClientRequest +} + +var ( + _ BaseClient = (*S3Client)(nil) + _ BaseClient = (*AzureClient)(nil) + _ BaseClient = (*HTTPClient)(nil) +) + +// ClientRequest is the abstract interface for a single request to send a batch of processed data. +// +// This exists as a separate interface because there are some request-scoped values that we'd like +// to include in the call to LogFields(). +type ClientRequest interface { + LogFields() zap.Field + Send(ctx context.Context, payload []byte) SimplifiableError +} + +type BaseClientConfig struct { + PushEverySeconds uint `json:"pushEverySeconds"` + PushRequestTimeoutSeconds uint `json:"pushRequestTimeoutSeconds"` + MaxBatchSize uint `json:"maxBatchSize"` +} + +// SimplifiableError is an extension of the standard 'error' interface that provides a +// safe-for-metrics string representing the error. +type SimplifiableError interface { + error + + Simplified() string +} + +// WrapSerialize is a combinator that takes an existing function valid for Client.SerializeBatch and +// produces a new function that applies the 'wrapper' function to the output before returning it. +// +// This can be used, for example, to provide a SerializeBatch implementation that gzips the data +// after encoding it as JSON, e.g., by: +// +// WrapSerialize(GZIPCompress, JSONLinesMarshalBatch) +func WrapSerialize[E any]( + wrapper func([]byte) ([]byte, SimplifiableError), + base func([]E) ([]byte, SimplifiableError), +) func([]E) ([]byte, SimplifiableError) { + return func(events []E) ([]byte, SimplifiableError) { + bs, err := base(events) + if err != nil { + return nil, err + } + return wrapper(bs) + } +} + +// JSONMarshalBatch is a helper function to trivially build a function satisfying +// Client.SerializeBatch. +// +// This function can't *directly* be used, because it takes any type as input, but a small wrapper +// function typically will suffice. +// +// Why not take a list directly? Sometimes there's a small amount of wrapping we'd like to do, e.g. +// packaging it as a struct instead of directly an array. +// +// See also: JSONLinesMarshalBatch, which *can* be used directly. +func JSONMarshalBatch[V any](value V) ([]byte, SimplifiableError) { + bs, err := json.Marshal(value) + if err != nil { + return nil, jsonError{err: err} + } + return bs, nil +} + +// JSONLinesMarshalBatch is a function to implement Client.SerializeBatch by serializing each event +// in the batch onto a separate JSON line. +// +// See also: JSONMarshalBatch +func JSONLinesMarshalBatch[E any](events []E) ([]byte, SimplifiableError) { + buf := bytes.Buffer{} + encoder := json.NewEncoder(&buf) + for i := range events { + // note: encoder.Encode appends a newline after encoding. This makes it valid for the + // "json lines" format. + err := encoder.Encode(&events[i]) + if err != nil { + return nil, jsonError{err: err} + } + } + return buf.Bytes(), nil +} + +type jsonError struct { + err error +} + +func (e jsonError) Error() string { + return fmt.Sprintf("%s: %s", e.Simplified(), e.err.Error()) +} + +func (e jsonError) Unwrap() error { + return e.err +} + +func (e jsonError) Simplified() string { + return "JSON marshaling error" +} + +// GZIPCompress is a helper function to compress a byte string with gzip +func GZIPCompress(payload []byte) ([]byte, SimplifiableError) { + buf := bytes.Buffer{} + + gzW := gzip.NewWriter(&buf) + _, err := gzW.Write(payload) + if err != nil { + return nil, gzipError{err: err} + } + + err = gzW.Close() // Have to close it before reading the buffer + if err != nil { + return nil, gzipError{err: err} + } + return buf.Bytes(), nil +} + +type gzipError struct { + err error +} + +func (e gzipError) Error() string { + return fmt.Sprintf("%s: %s", e.Simplified(), e.err.Error()) +} + +func (e gzipError) Unwrap() error { + return e.err +} + +func (e gzipError) Simplified() string { + return "gzip compression error" +} diff --git a/pkg/reporting/client_azureblob.go b/pkg/reporting/client_azureblob.go new file mode 100644 index 000000000..b998cda05 --- /dev/null +++ b/pkg/reporting/client_azureblob.go @@ -0,0 +1,120 @@ +package reporting + +import ( + "context" + "fmt" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type AzureAuthSharedKey struct { + AccountName string `json:"accountName"` + AccountKey string `json:"accountKey"` +} + +type AzureBlobStorageClientConfig struct { + // In Azure a Container is close to a bucket in AWS S3 + Container string `json:"container"` + // Example Endpoint: "https://MYSTORAGEACCOUNT.blob.core.windows.net/" + Endpoint string `json:"endpoint"` +} + +type AzureClient struct { + cfg AzureBlobStorageClientConfig + client *azblob.Client + + generateKey func() string +} + +type AzureError struct { + Err error +} + +func (e AzureError) Error() string { + return fmt.Sprintf("%s: %s", e.Simplified(), e.Err.Error()) +} + +func (e AzureError) Unwrap() error { + return e.Err +} + +func (e AzureError) Simplified() string { + return "Azure Blob error" +} + +func NewAzureBlobStorageClient( + cfg AzureBlobStorageClientConfig, + generateKey func() string, +) (*AzureClient, error) { + //nolint:exhaustruct // It's part of Azure SDK + clientOptions := &azblob.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Telemetry: policy.TelemetryOptions{ApplicationID: "neon-autoscaler"}, + }, + } + + credential, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, err + } + client, err := azblob.NewClient(cfg.Endpoint, credential, clientOptions) + if err != nil { + return nil, &AzureError{err} + } + + return NewAzureBlobStorageClientWithBaseClient(client, cfg, generateKey), nil +} + +func NewAzureBlobStorageClientWithBaseClient( + client *azblob.Client, + cfg AzureBlobStorageClientConfig, + generateKey func() string, +) *AzureClient { + return &AzureClient{ + cfg: cfg, + client: client, + generateKey: generateKey, + } +} + +// NewRequest implements BaseClient +func (c AzureClient) NewRequest() ClientRequest { + return &azureRequest{ + AzureClient: c, + key: c.generateKey(), + } +} + +// azureRequest is the implementation of ClientRequest used by AzureClient +type azureRequest struct { + AzureClient + key string +} + +// LogFields implements ClientRequest +func (r *azureRequest) LogFields() zap.Field { + return zap.Inline(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { + enc.AddString("container", r.cfg.Container) + enc.AddString("key", r.key) + enc.AddString("endpoint", r.cfg.Endpoint) + return nil + })) +} + +// Send implements ClientRequest +func (r *azureRequest) Send(ctx context.Context, payload []byte) SimplifiableError { + var err error + + opts := azblob.UploadBufferOptions{} //nolint:exhaustruct // It's part of Azure SDK + _, err = r.client.UploadBuffer(ctx, r.cfg.Container, r.key, payload, &opts) + if err != nil { + return AzureError{Err: err} + } + + return nil +} diff --git a/pkg/billing/azureblobstoragebilling_test.go b/pkg/reporting/client_azureblobbilling_test.go similarity index 75% rename from pkg/billing/azureblobstoragebilling_test.go rename to pkg/reporting/client_azureblobbilling_test.go index ac33cb26a..e3adb4449 100644 --- a/pkg/billing/azureblobstoragebilling_test.go +++ b/pkg/reporting/client_azureblobbilling_test.go @@ -1,4 +1,4 @@ -package billing +package reporting import ( "bytes" @@ -60,7 +60,6 @@ func TestAzureClient_send(t *testing.T) { ctx context.Context cfg AzureBlobStorageClientConfig payload []byte - traceID TraceID client *AzureClient } type output struct { @@ -88,7 +87,7 @@ func TestAzureClient_send(t *testing.T) { { name: "can write then read it", when: func(t *testing.T, i *input) { - _, err := i.client.c.CreateContainer(i.ctx, i.cfg.Container, + _, err := i.client.client.CreateContainer(i.ctx, i.cfg.Container, &azblob.CreateContainerOptions{}, //nolint:exhaustruct // OK for tests ) require.NoError(t, err) @@ -97,7 +96,7 @@ func TestAzureClient_send(t *testing.T) { require.NoError(t, o.err) b := make([]byte, 1000) const expectedText = "hello, billing data is here" - read, err := o.c.c.DownloadBuffer(o.ctx, "test-container", "test-blob-name", b, + read, err := o.c.client.DownloadBuffer(o.ctx, "test-container", "test-blob-name", b, &azblob.DownloadBufferOptions{}, //nolint:exhaustruct // OK for tests ) b = b[0:read] @@ -117,41 +116,42 @@ func TestAzureClient_send(t *testing.T) { endpoint := fmt.Sprintf("http://%s:%d/devstoreaccount1", azureBlobStorage.Host, azureBlobStorage.c.Ports["blob"].Port) + // Using well known credentials, + // see https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite + shKey, err := azblob.NewSharedKeyCredential( + "devstoreaccount1", + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", + ) + if err != nil { + panic(err) + } + + baseClient, err := azblob.NewClientWithSharedKeyCredential(endpoint, shKey, nil) + if err != nil { + panic(err) + } + // feel free to override in tests. - i := &input{ //nolint:exhaustruct // OK for tests - payload: []byte("hello, billing data is here"), + generateKey := func() string { + return "test-blob-name" + } + cfg := AzureBlobStorageClientConfig{ + Endpoint: endpoint, + Container: "test-container", + } + payload, err := GZIPCompress([]byte("hello, billing data is here")) + if err != nil { + panic(err) + } + i := &input{ + payload: payload, ctx: ctx, - cfg: AzureBlobStorageClientConfig{ - Endpoint: endpoint, - getClient: func() (*azblob.Client, error) { - // Using well known credentials, - // see https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite - shKey, err := azblob.NewSharedKeyCredential( - "devstoreaccount1", - "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==") - if err != nil { - panic(err) - } - - client, err := azblob.NewClientWithSharedKeyCredential(endpoint, shKey, nil) - if err != nil { - panic(err) - } - return client, nil - }, - PrefixInContainer: "test-prefix", - Container: "test-container", - generateKey: func() string { - return "test-blob-name" - }, - }, + cfg: cfg, + client: NewAzureBlobStorageClientWithBaseClient(baseClient, cfg, generateKey), } - c, err := NewAzureBlobStorageClient(i.cfg) - require.NoError(t, err) - i.client = c tt.when(t, i) - err = i.client.send(ctx, i.payload, i.traceID) + err = i.client.NewRequest().Send(ctx, i.payload) tt.then(t, output{ err: err, @@ -164,7 +164,11 @@ func TestAzureClient_send(t *testing.T) { func TestBytesForBilling(t *testing.T) { const expectedText = "hello, billing data is here" - billing, err := compress([]byte(expectedText)) + // pre-declare errors because otherwise we get type conflicts, as GZIPCompress returns a more + // specific type than just 'error'. + var err error + var billing []byte + billing, err = GZIPCompress([]byte(expectedText)) require.NoError(t, err) storage, err := bytesFromStorage(billing) require.NoError(t, err) diff --git a/pkg/reporting/client_http.go b/pkg/reporting/client_http.go new file mode 100644 index 000000000..1f632fbb4 --- /dev/null +++ b/pkg/reporting/client_http.go @@ -0,0 +1,107 @@ +package reporting + +import ( + "bytes" + "context" + "fmt" + "net/http" + + "github.com/lithammer/shortuuid" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/neondatabase/autoscaling/pkg/util" +) + +type HTTPClient struct { + client *http.Client + cfg HTTPClientConfig +} + +type HTTPClientConfig struct { + URL string `json:"url"` + Method string `json:"method"` +} + +type httpRequestError struct { + err error +} + +func (e httpRequestError) Error() string { + return fmt.Sprintf("Error making request: %s", e.err.Error()) +} + +func (e httpRequestError) Unwrap() error { + return e.err +} + +func (e httpRequestError) Simplified() string { + return util.RootError(e.err).Error() +} + +type httpUnexpectedStatusCodeError struct { + statusCode int +} + +func (e httpUnexpectedStatusCodeError) Error() string { + return fmt.Sprintf("Unexpected HTTP status code %d", e.statusCode) +} + +func (e httpUnexpectedStatusCodeError) Simplified() string { + return fmt.Sprintf("HTTP code %d", e.statusCode) +} + +func NewHTTPClient(client *http.Client, cfg HTTPClientConfig) HTTPClient { + return HTTPClient{ + client: client, + cfg: cfg, + } +} + +// NewRequest implements BaseClient +func (c HTTPClient) NewRequest() ClientRequest { + return &httpRequest{ + HTTPClient: c, + traceID: shortuuid.New(), + } +} + +// httpRequest is the implementation of ClientRequest used by HTTPClient +type httpRequest struct { + HTTPClient + traceID string +} + +// Send implements ClientRequest +func (r *httpRequest) Send(ctx context.Context, payload []byte) SimplifiableError { + req, err := http.NewRequestWithContext(ctx, r.cfg.Method, r.cfg.URL, bytes.NewReader(payload)) + if err != nil { + return httpRequestError{err: err} + } + req.Header.Set("content-type", "application/json") + req.Header.Set("x-trace-id", r.traceID) + + resp, err := r.client.Do(req) + if err != nil { + return httpRequestError{err: err} + } + defer resp.Body.Close() + + // theoretically if wanted/needed, we should use an http handler that + // does the retrying, to avoid writing that logic here. + if resp.StatusCode != http.StatusOK { + return httpUnexpectedStatusCodeError{statusCode: resp.StatusCode} + } + + return nil +} + +// LogFields implements ClientRequest +func (r *httpRequest) LogFields() zap.Field { + return zap.Inline(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { + enc.AddString("url", r.cfg.URL) + enc.AddString("method", r.cfg.Method) + enc.AddString("traceID", r.traceID) + return nil + })) +} diff --git a/pkg/reporting/client_s3.go b/pkg/reporting/client_s3.go new file mode 100644 index 000000000..30d8407ac --- /dev/null +++ b/pkg/reporting/client_s3.go @@ -0,0 +1,114 @@ +package reporting + +import ( + "bytes" + "context" + "fmt" + "time" + + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// S3Client is a BaseClient for S3 +type S3Client struct { + cfg S3ClientConfig + client *s3.Client + + generateKey func() string +} + +type S3ClientConfig struct { + Bucket string `json:"bucket"` + Region string `json:"region"` + Endpoint string `json:"endpoint"` +} + +type S3Error struct { + Err error +} + +func (e S3Error) Error() string { + return fmt.Sprintf("%s: %s", e.Simplified(), e.Err.Error()) +} + +func (e S3Error) Unwrap() error { + return e.Err +} + +func (e S3Error) Simplified() string { + return "S3 error" +} + +func NewS3Client( + ctx context.Context, + cfg S3ClientConfig, + generateKey func() string, +) (*S3Client, error) { + // Timeout in case we have hidden IO inside config creation + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + s3Config, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.Region)) + if err != nil { + return nil, S3Error{Err: err} + } + + client := s3.NewFromConfig(s3Config, func(o *s3.Options) { + if cfg.Endpoint != "" { + o.BaseEndpoint = &cfg.Endpoint + } + o.UsePathStyle = true // required for minio + }) + + return &S3Client{ + cfg: cfg, + client: client, + + generateKey: generateKey, + }, nil +} + +// NewRequest implements BaseClient +func (c *S3Client) NewRequest() ClientRequest { + return &s3Request{ + S3Client: c, + key: c.generateKey(), + } +} + +// s3Request is the implementation of ClientRequest used by S3Client +type s3Request struct { + *S3Client + key string +} + +// LogFields implements ClientRequest +func (r *s3Request) LogFields() zap.Field { + return zap.Inline(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { + enc.AddString("bucket", r.cfg.Bucket) + enc.AddString("key", r.key) + enc.AddString("region", r.cfg.Region) + enc.AddString("endpoint", r.cfg.Endpoint) + return nil + })) +} + +// Send implements ClientRequest +func (r *s3Request) Send(ctx context.Context, payload []byte) SimplifiableError { + var err error + + body := bytes.NewReader(payload) + _, err = r.client.PutObject(ctx, &s3.PutObjectInput{ //nolint:exhaustruct // AWS SDK + Bucket: &r.cfg.Bucket, + Key: &r.key, + Body: body, + }) + if err != nil { + return S3Error{Err: err} + } + + return nil +} diff --git a/pkg/agent/billing/queue.go b/pkg/reporting/queue.go similarity index 95% rename from pkg/agent/billing/queue.go rename to pkg/reporting/queue.go index 83af4962b..174e46753 100644 --- a/pkg/agent/billing/queue.go +++ b/pkg/reporting/queue.go @@ -1,4 +1,4 @@ -package billing +package reporting // Implementation of the event queue for mediating event generation and event sending. // @@ -13,7 +13,6 @@ import ( "golang.org/x/exp/slices" ) -// this is generic just so there's less typing - "billing.IncrementalEvent" is long! type eventQueueInternals[E any] struct { mu sync.Mutex items []E diff --git a/pkg/agent/billing/send.go b/pkg/reporting/send.go similarity index 58% rename from pkg/agent/billing/send.go rename to pkg/reporting/send.go index b20b5a4ad..0a2780d06 100644 --- a/pkg/agent/billing/send.go +++ b/pkg/reporting/send.go @@ -1,30 +1,18 @@ -package billing - -// Logic responsible for sending billing events by repeatedly pulling from the eventQueue +package reporting import ( "context" - "fmt" "time" "go.uber.org/zap" - - "github.com/neondatabase/autoscaling/pkg/billing" - "github.com/neondatabase/autoscaling/pkg/util" ) -type clientInfo struct { - client billing.Client - name string - config BaseClientConfig -} - -type eventSender struct { - clientInfo +type eventSender[E any] struct { + client Client[E] - metrics PromMetrics - queue eventQueuePuller[*billing.IncrementalEvent] - collectorFinished util.CondChannelReceiver + metrics *EventSinkMetrics + queue eventQueuePuller[E] + done <-chan struct{} // lastSendDuration tracks the "real" last full duration of (eventSender).sendAllCurrentEvents(). // @@ -49,16 +37,16 @@ type eventSender struct { lastSendDuration time.Duration } -func (s eventSender) senderLoop(logger *zap.Logger) { - ticker := time.NewTicker(time.Second * time.Duration(s.config.PushEverySeconds)) +func (s eventSender[E]) senderLoop(logger *zap.Logger) { + ticker := time.NewTicker(time.Second * time.Duration(s.client.BaseConfig.PushEverySeconds)) defer ticker.Stop() for { final := false select { - case <-s.collectorFinished.Recv(): - logger.Info("Received notification that collector finished") + case <-s.done: + logger.Info("Received notification that events submission is done") final = true case <-ticker.C: } @@ -72,13 +60,13 @@ func (s eventSender) senderLoop(logger *zap.Logger) { } } -func (s eventSender) sendAllCurrentEvents(logger *zap.Logger) { +func (s eventSender[E]) sendAllCurrentEvents(logger *zap.Logger) { logger.Info("Pushing all available events") if s.queue.size() == 0 { - logger.Info("No billing events to push") + logger.Info("No events to push") s.lastSendDuration = 0 - s.metrics.lastSendDuration.WithLabelValues(s.clientInfo.name).Set(1e-6) // small value, to indicate that nothing happened + s.metrics.lastSendDuration.WithLabelValues(s.client.Name).Set(1e-6) // small value, to indicate that nothing happened return } @@ -97,12 +85,12 @@ func (s eventSender) sendAllCurrentEvents(logger *zap.Logger) { logger.Info("Current queue size is non-zero", zap.Int("queueSize", size)) } - chunk := s.queue.get(int(s.config.MaxBatchSize)) + chunk := s.queue.get(int(s.client.BaseConfig.MaxBatchSize)) count := len(chunk) if count == 0 { totalTime := time.Since(startTime) s.lastSendDuration = totalTime - s.metrics.lastSendDuration.WithLabelValues(s.clientInfo.name).Set(totalTime.Seconds()) + s.metrics.lastSendDuration.WithLabelValues(s.client.Name).Set(totalTime.Seconds()) logger.Info( "All available events have been sent", @@ -112,21 +100,28 @@ func (s eventSender) sendAllCurrentEvents(logger *zap.Logger) { return } - traceID := billing.GenerateTraceID() + req := s.client.Base.NewRequest() logger.Info( - "Pushing billing events", + "Pushing events", zap.Int("count", count), - zap.String("traceID", string(traceID)), - s.client.LogFields(), + req.LogFields(), ) reqStart := time.Now() - err := func() error { - reqCtx, cancel := context.WithTimeout(context.TODO(), time.Second*time.Duration(s.config.PushRequestTimeoutSeconds)) + err := func() SimplifiableError { + reqCtx, cancel := context.WithTimeout( + context.TODO(), + time.Second*time.Duration(s.client.BaseConfig.PushRequestTimeoutSeconds), + ) defer cancel() - return billing.Send(reqCtx, s.client, traceID, chunk) + payload, err := s.client.SerializeBatch(chunk) + if err != nil { + return err + } + + return req.Send(reqCtx, payload) }() reqDuration := time.Since(reqStart) @@ -137,31 +132,17 @@ func (s eventSender) sendAllCurrentEvents(logger *zap.Logger) { "Failed to push billing events", zap.Int("count", count), zap.Duration("after", reqDuration), - zap.String("traceID", string(traceID)), - s.client.LogFields(), + req.LogFields(), zap.Int("total", total), zap.Duration("totalTime", time.Since(startTime)), zap.Error(err), ) - var rootErr string - //nolint:errorlint // The type switch (instead of errors.As) is ok; billing.Send() guarantees the error types. - switch e := err.(type) { - case billing.JSONError: - rootErr = "JSON marshaling" - case billing.UnexpectedStatusCodeError: - rootErr = fmt.Sprintf("HTTP code %d", e.StatusCode) - case billing.S3Error: - rootErr = "S3 error" - case billing.AzureError: - rootErr = "Azure Blob error" - default: - rootErr = util.RootError(err).Error() - } - s.metrics.sendErrorsTotal.WithLabelValues(s.clientInfo.name, rootErr).Inc() + rootErr := err.Simplified() + s.metrics.sendErrorsTotal.WithLabelValues(s.client.Name, rootErr).Inc() s.lastSendDuration = 0 - s.metrics.lastSendDuration.WithLabelValues(s.clientInfo.name).Set(0.0) // use 0 as a flag that something went wrong; there's no valid time here. + s.metrics.lastSendDuration.WithLabelValues(s.client.Name).Set(0.0) // use 0 as a flag that something went wrong; there's no valid time here. return } @@ -170,18 +151,17 @@ func (s eventSender) sendAllCurrentEvents(logger *zap.Logger) { currentTotalTime := time.Since(startTime) logger.Info( - "Successfully pushed some billing events", + "Successfully pushed some events", zap.Int("count", count), zap.Duration("after", reqDuration), - zap.String("traceID", string(traceID)), - s.client.LogFields(), + req.LogFields(), zap.Int("total", total), zap.Duration("totalTime", currentTotalTime), ) if currentTotalTime > s.lastSendDuration { s.lastSendDuration = currentTotalTime - s.metrics.lastSendDuration.WithLabelValues(s.clientInfo.name).Set(currentTotalTime.Seconds()) + s.metrics.lastSendDuration.WithLabelValues(s.client.Name).Set(currentTotalTime.Seconds()) } } } diff --git a/pkg/reporting/sink.go b/pkg/reporting/sink.go new file mode 100644 index 000000000..941d156dc --- /dev/null +++ b/pkg/reporting/sink.go @@ -0,0 +1,86 @@ +package reporting + +// public API for event reporting + +import ( + "fmt" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +type EventSink[E any] struct { + queueWriters []eventQueuePusher[E] + done func() +} + +func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients ...Client[E]) *EventSink[E] { + var queueWriters []eventQueuePusher[E] + signalDone := make(chan struct{}) + + for _, c := range clients { + qw, qr := newEventQueue[E](metrics.queueSizeCurrent.WithLabelValues(c.Name)) + queueWriters = append(queueWriters, qw) + + // Start the sender + sender := eventSender[E]{ + client: c, + metrics: metrics, + queue: qr, + done: signalDone, + lastSendDuration: 0, + } + go sender.senderLoop(logger.Named(fmt.Sprintf("send-%s", c.Name))) + } + + return &EventSink[E]{ + queueWriters: queueWriters, + done: sync.OnceFunc(func() { close(signalDone) }), + } +} + +// Enqueue submits the event to the internal client sending queues, returning without blocking. +func (s *EventSink[E]) Enqueue(event E) { + for _, q := range s.queueWriters { + q.enqueue(event) + } +} + +type EventSinkMetrics struct { + queueSizeCurrent *prometheus.GaugeVec + lastSendDuration *prometheus.GaugeVec + sendErrorsTotal *prometheus.CounterVec +} + +func NewEventSinkMetrics(prefix string) *EventSinkMetrics { + return &EventSinkMetrics{ + queueSizeCurrent: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: fmt.Sprintf("%s_queue_size", prefix), + Help: "Size of the billing subsystem's queue of unsent events", + }, + []string{"client"}, + ), + lastSendDuration: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: fmt.Sprintf("%s_last_send_duration_seconds", prefix), + Help: "Duration, in seconds, that it took to send the latest set of billing events (or current time if ongoing)", + }, + []string{"client"}, + ), + sendErrorsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: fmt.Sprintf("%s_send_errors_total", prefix), + Help: "Total errors from attempting to send billing events", + }, + []string{"client", "cause"}, + ), + } +} + +func (m *EventSinkMetrics) MustRegister(reg *prometheus.Registry) { + reg.MustRegister(m.queueSizeCurrent) + reg.MustRegister(m.lastSendDuration) + reg.MustRegister(m.sendErrorsTotal) +} diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go index bd984cf0d..b7d26faaf 100644 --- a/pkg/util/watch/watch.go +++ b/pkg/util/watch/watch.go @@ -742,10 +742,12 @@ func keyForObj[T any](obj *T) util.NamespacedName { func (i *NameIndex[T]) Add(obj *T) { i.namespacedNames[keyForObj(obj)] = obj } + func (i *NameIndex[T]) Update(oldObj, newObj *T) { i.Delete(oldObj) i.Add(newObj) } + func (i *NameIndex[T]) Delete(obj *T) { delete(i.namespacedNames, keyForObj(obj)) } @@ -781,10 +783,12 @@ func getName[T any](obj *T) string { func (i *FlatNameIndex[T]) Add(obj *T) { i.names[getName(obj)] = obj } + func (i *FlatNameIndex[T]) Update(oldObj, newObj *T) { i.Delete(oldObj) i.Add(newObj) } + func (i *FlatNameIndex[T]) Delete(obj *T) { delete(i.names, getName(obj)) } diff --git a/vm-builder/main.go b/vm-builder/main.go index 6771e7dea..3c379aa4c 100644 --- a/vm-builder/main.go +++ b/vm-builder/main.go @@ -92,7 +92,7 @@ func addFileToTar(tw *tar.Writer, filename string, contents []byte) error { tarHeader := &tar.Header{ Name: filename, Size: int64(len(contents)), - Mode: 0755, // TODO: shouldn't just set this for everything. + Mode: 0o755, // TODO: shouldn't just set this for everything. } if err := tw.WriteHeader(tarHeader); err != nil { @@ -452,7 +452,6 @@ func main() { } } - } type imageSpec struct {