Skip to content

Commit

Permalink
refactor: convert MachineSetStatus controller to QController
Browse files Browse the repository at this point in the history
Pretty much replace the controller code, restructure, split it to
smaller chunks and cover with tests.
Extract machine teardown logic to a separate controller.

Signed-off-by: Artem Chernyshev <[email protected]>
  • Loading branch information
Unix4ever committed Mar 5, 2024
1 parent 47d8429 commit 0eed757
Show file tree
Hide file tree
Showing 44 changed files with 3,805 additions and 1,318 deletions.
3 changes: 0 additions & 3 deletions client/pkg/omni/resources/omni/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ const (
// tsgen:LabelMachine
LabelMachine = SystemLabelPrefix + "machine"

// LabelSkipTeardown is the test label that configures machine set controller to skip teardown sequence for the cluster machine.
LabelSkipTeardown = SystemLabelPrefix + "machine-set-skip-teardown"

// LabelSystemPatch marks the patch as the system patch, so it shouldn't be editable by the user.
// tsgen:LabelSystemPatch
LabelSystemPatch = SystemLabelPrefix + "system-patch"
Expand Down
2 changes: 1 addition & 1 deletion cmd/omni-integration-test/pkg/tests/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func AssertForceRemoveWorkerNode(testCtx context.Context, st state.State, cluste
// The VM is wiped & rebooted to bring it back as an available machine.
func AssertControlPlaneForceReplaceMachine(testCtx context.Context, st state.State, clusterName string, options Options) TestFunc {
return func(t *testing.T) {
ctx, cancel := context.WithTimeout(testCtx, 90*time.Second)
ctx, cancel := context.WithTimeout(testCtx, 5*time.Minute)
defer cancel()

if options.WipeAMachineFunc == nil {
Expand Down
129 changes: 129 additions & 0 deletions internal/backend/runtime/omni/controllers/helpers/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

// Package helpers contains common utility methods for COSI controllers of Omni.
package helpers

import (
"crypto/sha256"
"encoding/hex"
"fmt"
"strings"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/kvutils"
"github.com/siderolabs/gen/xslices"

"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
)

// InputResourceVersionAnnotation is the annotation name where the inputs version sha is stored.
const InputResourceVersionAnnotation = "inputResourceVersion"

// UpdateInputsVersions generates a hash of the resource by combining its inputs.
func UpdateInputsVersions[T resource.Resource](out resource.Resource, inputs ...T) bool {
return UpdateInputsAnnotation(out, xslices.Map(inputs, func(input T) string {
return fmt.Sprintf("%s/%s@%s", input.Metadata().Type(), input.Metadata().ID(), input.Metadata().Version())
})...)
}

// UpdateInputsAnnotation updates the annotation with the input resource version and returns if it has changed.
func UpdateInputsAnnotation(out resource.Resource, versions ...string) bool {
hash := sha256.New()

for i, version := range versions {
if i > 0 {
hash.Write([]byte(","))
}

hash.Write([]byte(version))
}

inVersion := hex.EncodeToString(hash.Sum(nil))

version, found := out.Metadata().Annotations().Get(InputResourceVersionAnnotation)

if found && version == inVersion {
return false
}

out.Metadata().Annotations().Set(InputResourceVersionAnnotation, inVersion)

return true
}

// CopyAllLabels copies all labels from one resource to another.
func CopyAllLabels(src, dst resource.Resource) {
dst.Metadata().Labels().Do(func(tmp kvutils.TempKV) {
for key, value := range src.Metadata().Labels().Raw() {
tmp.Set(key, value)
}
})
}

// CopyLabels copies the labels from one resource to another.
func CopyLabels(src, dst resource.Resource, keys ...string) {
dst.Metadata().Labels().Do(func(tmp kvutils.TempKV) {
for _, key := range keys {
if label, ok := src.Metadata().Labels().Get(key); ok {
tmp.Set(key, label)
}
}
})
}

// CopyAllAnnotations copies all annotations from one resource to another.
func CopyAllAnnotations(src, dst resource.Resource) {
dst.Metadata().Annotations().Do(func(tmp kvutils.TempKV) {
for key, value := range src.Metadata().Annotations().Raw() {
tmp.Set(key, value)
}
})
}

// CopyAnnotations copies annotations from one resource to another.
func CopyAnnotations(src, dst resource.Resource, annotations ...string) {
dst.Metadata().Annotations().Do(func(tmp kvutils.TempKV) {
for _, key := range annotations {
if label, ok := src.Metadata().Annotations().Get(key); ok {
tmp.Set(key, label)
}
}
})
}

// CopyUserLabels copies all user labels from one resource to another.
// It removes all user labels on the target that are not present in the source resource.
// System labels are not copied.
func CopyUserLabels(target resource.Resource, labels map[string]string) {
ClearUserLabels(target)

if len(labels) == 0 {
return
}

target.Metadata().Labels().Do(func(tmp kvutils.TempKV) {
for key, value := range labels {
if strings.HasPrefix(key, omni.SystemLabelPrefix) {
continue
}

tmp.Set(key, value)
}
})
}

// ClearUserLabels removes all user labels from the resource.
func ClearUserLabels(res resource.Resource) {
res.Metadata().Labels().Do(func(tmp kvutils.TempKV) {
for key := range res.Metadata().Labels().Raw() {
if strings.HasPrefix(key, omni.SystemLabelPrefix) {
continue
}

tmp.Delete(key)
}
})
}
36 changes: 36 additions & 0 deletions internal/backend/runtime/omni/controllers/helpers/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

package helpers_test

import (
"testing"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/stretchr/testify/assert"

"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/helpers"
)

func TestUpdateInputsVersions(t *testing.T) {
out := omni.NewCluster("default", "test")

in := []resource.Resource{omni.NewMachine("default", "test1"), omni.NewMachine("default", "test2")}

assert.True(t, helpers.UpdateInputsVersions(out, in...))

v, _ := out.Metadata().Annotations().Get("inputResourceVersion")
assert.Equal(t, "a7a451e614fc3b4a7241798235001fea271c7ad5493c392f0a012104379bdb89", v)

assert.False(t, helpers.UpdateInputsVersions(out, in...))

in = append(in, omni.NewClusterMachine("default", "cm1"))

assert.True(t, helpers.UpdateInputsVersions(out, in...))

v, _ = out.Metadata().Annotations().Get("inputResourceVersion")
assert.Equal(t, "df4af53c3caf7ae4c0446bcf8b854ed3f5740a47eab0e5151f1962a4a4d52f6f", v)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import (

"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/helpers"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/mappers"
appconfig "github.com/siderolabs/omni/internal/pkg/config"
"github.com/siderolabs/omni/internal/pkg/constants"
)

const clusterMachineConfigControllerName = "ClusterMachineConfigController"

// ClusterMachineConfigController manages machine configurations for each ClusterMachine.
//
// ClusterMachineConfigController generates machine configuration for each created machine.
Expand All @@ -45,7 +48,7 @@ type ClusterMachineConfigController = qtransform.QController[*omni.ClusterMachin
func NewClusterMachineConfigController(defaultGenOptions []generate.Option) *ClusterMachineConfigController {
return qtransform.NewQController(
qtransform.Settings[*omni.ClusterMachine, *omni.ClusterMachineConfig]{
Name: "ClusterMachineConfigController",
Name: clusterMachineConfigControllerName,
MapMetadataFunc: func(clusterMachine *omni.ClusterMachine) *omni.ClusterMachineConfig {
return omni.NewClusterMachineConfig(resources.DefaultNamespace, clusterMachine.Metadata().ID())
},
Expand Down Expand Up @@ -204,11 +207,11 @@ func reconcileClusterMachineConfig(
clusterMachineTalosVersion,
}

if !UpdateInputsVersions(machineConfig, inputs...) {
if !helpers.UpdateInputsVersions(machineConfig, inputs...) {
return xerrors.NewTagged[qtransform.SkipReconcileTag](errors.New("config inputs not changed"))
}

machineConfig.Metadata().Labels().Set(omni.LabelCluster, clusterName)
helpers.CopyLabels(clusterMachine, machineConfig, omni.LabelMachineSet, omni.LabelCluster, omni.LabelControlPlaneRole, omni.LabelWorkerRole)

// TODO: temporary transition code, remove in the future
if clusterMachine.TypedSpec().Value.KubernetesVersion == "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ import (
"github.com/siderolabs/omni/client/pkg/meta"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/helpers"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/mappers"
talosutils "github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/talos"
"github.com/siderolabs/omni/internal/backend/runtime/talos"
)

const gracefulResetAttemptCount = 2
const (
gracefulResetAttemptCount = 4
etcdLeaveAttemptsLimit = 2
)

// ClusterMachineConfigStatusController manages ClusterMachineStatus resource lifecycle.
//
Expand Down Expand Up @@ -161,6 +165,8 @@ func NewClusterMachineConfigStatusController() *ClusterMachineConfigStatusContro
return fmt.Errorf("failed to apply config to machine '%s': %w", machineConfig.Metadata().ID(), err)
}

helpers.CopyLabels(machineConfig, configStatus, omni.LabelMachineSet, omni.LabelCluster, omni.LabelControlPlaneRole, omni.LabelWorkerRole)

configStatus.TypedSpec().Value.ClusterMachineVersion = machineConfig.TypedSpec().Value.ClusterMachineVersion
configStatus.TypedSpec().Value.ClusterMachineConfigVersion = machineConfig.Metadata().Version().String()
configStatus.TypedSpec().Value.ClusterMachineConfigSha256 = shaSumString
Expand Down Expand Up @@ -219,8 +225,8 @@ func NewClusterMachineConfigStatusController() *ClusterMachineConfigStatusContro
}

type resetStatus struct {
attempts uint
forceUngraceful bool
resetAttempts uint
etcdLeaveAttempts uint
}

type ongoingResets struct {
Expand All @@ -243,7 +249,16 @@ func (r *ongoingResets) isGraceful(id resource.ID) bool {
return true
}

return !rs.forceUngraceful && rs.attempts < gracefulResetAttemptCount
return rs.resetAttempts < gracefulResetAttemptCount
}

func (r *ongoingResets) shouldLeaveEtcd(id string) bool {
rs, ok := r.getStatus(id)
if !ok {
return true
}

return rs.etcdLeaveAttempts < etcdLeaveAttemptsLimit
}

func (r *ongoingResets) handleReset(id resource.ID) uint {
Expand All @@ -254,20 +269,20 @@ func (r *ongoingResets) handleReset(id resource.ID) uint {
r.statuses[id] = &resetStatus{}
}

r.statuses[id].attempts++
r.statuses[id].resetAttempts++

return r.statuses[id].attempts
return r.statuses[id].resetAttempts
}

func (r *ongoingResets) forceUngraceful(id resource.ID) {
func (r *ongoingResets) handleEtcdLeave(id resource.ID) {
r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.statuses[id]; !ok {
r.statuses[id] = &resetStatus{}
}

r.statuses[id].forceUngraceful = true
r.statuses[id].etcdLeaveAttempts++
}

func (r *ongoingResets) deleteStatus(id resource.ID) {
Expand Down Expand Up @@ -456,7 +471,7 @@ func (h *clusterMachineConfigStatusControllerHandler) reset(
machineConfig *omni.ClusterMachineConfig,
clusterMachine *omni.ClusterMachine,
) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

logger := h.logger.With(
Expand Down Expand Up @@ -512,6 +527,10 @@ func (h *clusterMachineConfigStatusControllerHandler) reset(

machineStage := statusSnapshot.TypedSpec().Value.GetMachineStatus().GetStage()

if machineStage == machineapi.MachineStatusEvent_RESETTING {
return controller.NewRequeueErrorf(time.Minute, "the machine is already being reset")
}

logger.Debug("getting ready to reset the machine", zap.Stringer("stage", machineStage))

inMaintenance := machineStage == machineapi.MachineStatusEvent_MAINTENANCE
Expand Down Expand Up @@ -554,7 +573,7 @@ func (h *clusterMachineConfigStatusControllerHandler) reset(
graceful = false
}

isControlPlane := isControlPlane(clusterMachine)
_, isControlPlane := clusterMachine.Metadata().Labels().Get(omni.LabelControlPlaneRole)

switch {
// check that machine is ready to be reset
Expand Down Expand Up @@ -587,19 +606,12 @@ func (h *clusterMachineConfigStatusControllerHandler) reset(
}

// if is control plane first leave etcd
if graceful && isControlPlane {
_, err = c.EtcdForfeitLeadership(ctx, &machineapi.EtcdForfeitLeadershipRequest{})
if err != nil {
h.ongoingResets.forceUngraceful(clusterMachine.Metadata().ID())
if isControlPlane && h.ongoingResets.shouldLeaveEtcd(clusterMachine.Metadata().ID()) {
h.ongoingResets.handleEtcdLeave(clusterMachine.Metadata().ID())

return fmt.Errorf("failed to forfeit leadership, node %q: %w", machineConfig.Metadata().ID(), err)
}

err = c.EtcdLeaveCluster(ctx, &machineapi.EtcdLeaveClusterRequest{})
err = h.gracefulEtcdLeave(ctx, c, clusterMachine.Metadata().ID())
if err != nil {
h.ongoingResets.forceUngraceful(clusterMachine.Metadata().ID())

return fmt.Errorf("failed to leave etcd cluster, node %q: %w", machineConfig.Metadata().ID(), err)
return controller.NewRequeueError(err, time.Second)
}
}

Expand Down Expand Up @@ -632,6 +644,20 @@ func (h *clusterMachineConfigStatusControllerHandler) reset(
return fmt.Errorf("failed resetting node '%s': %w", machineConfig.Metadata().ID(), err)
}

func (h *clusterMachineConfigStatusControllerHandler) gracefulEtcdLeave(ctx context.Context, c *client.Client, id string) error {
_, err := c.EtcdForfeitLeadership(ctx, &machineapi.EtcdForfeitLeadershipRequest{})
if err != nil {
return fmt.Errorf("failed to forfeit leadership, node %q: %w", id, err)
}

err = c.EtcdLeaveCluster(ctx, &machineapi.EtcdLeaveClusterRequest{})
if err != nil {
return fmt.Errorf("failed to leave etcd cluster, node %q: %w", id, err)
}

return nil
}

func (h *clusterMachineConfigStatusControllerHandler) getClient(
ctx context.Context,
useMaintenance bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,13 @@ func (suite *ClusterMachineConfigStatusSuite) TestResetUngraceful() {
suite.destroyCluster(cluster)

for _, m := range machines {
suite.Assert().NoError(retry.Constant(5*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.Assert().NoError(retry.Constant(30*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertNoResource(*omni.NewClusterMachineConfigStatus(resources.DefaultNamespace, m.Metadata().ID()).Metadata()),
))
}

for i, m := range machines {
count := 3
if i == brokenEtcdMachine {
count = 1
}
for _, m := range machines {
count := 5

suite.Assert().Len(machineServices[m.Metadata().ID()].getResetRequests(), count)
}
Expand Down
Loading

0 comments on commit 0eed757

Please sign in to comment.