From 896926003e907dc4478055fdee8ae2913600cff0 Mon Sep 17 00:00:00 2001 From: zoetrope Date: Fri, 8 Sep 2023 15:15:05 +0900 Subject: [PATCH 1/2] Replace user resources with `ckecli resource set` sub-command. --- pkg/ckecli/cmd/resource_set.go | 22 +++++++--------- storage.go | 47 ++++++++++++++++++++++++++++++++++ storage_test.go | 17 ++++++++++++ 3 files changed, 73 insertions(+), 13 deletions(-) diff --git a/pkg/ckecli/cmd/resource_set.go b/pkg/ckecli/cmd/resource_set.go index 2ea8aa585..82351c379 100644 --- a/pkg/ckecli/cmd/resource_set.go +++ b/pkg/ckecli/cmd/resource_set.go @@ -13,22 +13,16 @@ import ( k8sYaml "k8s.io/apimachinery/pkg/util/yaml" ) -func updateResource(ctx context.Context, data []byte) error { - key, err := cke.ParseResource(data) - if err != nil { - return err - } - - return storage.SetResource(ctx, key, string(data)) -} - var resourceSetCmd = &cobra.Command{ Use: "set FILE", Short: "register user-defined resources.", Long: `Register user-defined resources. FILE should contain multiple Kubernetes resources in YAML or JSON format. -If FILE is "-", then data is read from stdin.`, +If FILE is "-", then data is read from stdin. + +If a resource with the same key as a registered resource is specified, the resource will be overwritten. +If a resource exists in a registered resource but not in the specified resource, the resource will be deleted.`, Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { @@ -43,21 +37,23 @@ If FILE is "-", then data is read from stdin.`, } well.Go(func(ctx context.Context) error { + newResources := make(map[string]string) y := k8sYaml.NewYAMLReader(bufio.NewReader(r)) for { data, err := y.Read() if err == io.EOF { - return nil + break } if err != nil { return err } - - err = updateResource(ctx, data) + key, err := cke.ParseResource(data) if err != nil { return err } + newResources[key] = string(data) } + return storage.ReplaceResources(ctx, newResources) }) well.Stop() return well.Wait() diff --git a/storage.go b/storage.go index 5895b8a85..4081a1743 100644 --- a/storage.go +++ b/storage.go @@ -589,6 +589,53 @@ func (s Storage) DeleteResource(ctx context.Context, key string) error { return err } +// ReplaceResources replaces all user resources with new ones. +func (s Storage) ReplaceResources(ctx context.Context, newResources map[string]string) error { +RETRY: + current, err := s.GetAllResources(ctx) + if err != nil { + return err + } + currentResources := make(map[string]ResourceDefinition) + for _, r := range current { + currentResources[r.Key] = r + } + + var ifOps []clientv3.Cmp + var thenOps []clientv3.Op + + for key, value := range newResources { + cur := currentResources[key] + if value == string(cur.Definition) { + continue + } + + ifOps = append(ifOps, clientv3.Compare(clientv3.ModRevision(KeyResourcePrefix+key), "=", cur.Revision)) + thenOps = append(thenOps, clientv3.OpPut(KeyResourcePrefix+key, value)) + + if _, ok := currentResources[key]; ok { + delete(currentResources, key) + } + } + for key := range currentResources { + ifOps = append(ifOps, clientv3util.KeyMissing(KeyResourcePrefix+key)) + thenOps = append(thenOps, clientv3.OpDelete(KeyResourcePrefix+key)) + } + + txnResp, err := s.Txn(ctx). + If(ifOps...). + Then(thenOps...). + Commit() + + if err != nil { + return err + } + if !txnResp.Succeeded { + goto RETRY + } + return nil +} + // IsSabakanDisabled returns true if sabakan integration is disabled. func (s Storage) IsSabakanDisabled(ctx context.Context) (bool, error) { resp, err := s.Get(ctx, KeySabakanDisabled) diff --git a/storage_test.go b/storage_test.go index 5daac1b0e..50ed365d2 100644 --- a/storage_test.go +++ b/storage_test.go @@ -491,6 +491,23 @@ func testStorageResource(t *testing.T) { t.Error(`err != ErrNotFound,`, err) } + input := map[string]string{ + "ServiceAccount/foo/sa1": "test", // will not be changed + "ConfigMap/foo/conf1": "overwrite", // will be overwritten + "Pod/foo/pod2": "new", // pod2 will be added, pod1 will be deleted + } + err = storage.ReplaceResources(ctx, input) + if err != nil { + t.Fatal(err) + } + + resources, err = storage.GetAllResources(ctx) + if err != nil { + t.Fatal(err) + } + if !cmp.Equal(input, resources) { + t.Error("unexpected resources", cmp.Diff(input, resources)) + } } func testStorageSabakan(t *testing.T) { From 329ce31b0a4d4db0a5646a170d13992de5bdb772 Mon Sep 17 00:00:00 2001 From: zoetrope Date: Fri, 8 Sep 2023 19:33:08 +0900 Subject: [PATCH 2/2] Once a user-defined resource is deleted, it should be deleted from Kubernetes cluster. --- op/resource.go | 60 ++++++++++++++++++++++++++++ op/status.go | 37 ++++++++++++++++++ resource.go | 35 +++++++++++++++++ server/control.go | 6 ++- server/strategy.go | 16 +++++--- server/strategy_test.go | 13 +++--- status.go | 1 + storage.go | 87 ++++++++++++++++++++++++++++------------- 8 files changed, 215 insertions(+), 40 deletions(-) diff --git a/op/resource.go b/op/resource.go index e7e708d73..2f301d397 100644 --- a/op/resource.go +++ b/op/resource.go @@ -69,3 +69,63 @@ func (o *resourceApplyOp) Command() cke.Command { Target: o.resource.String(), } } + +type resourceDeleteOp struct { + apiserver *cke.Node + resourceKey string + resourceData []byte + + finished bool +} + +// ResourceDeleteOp creates or updates a Kubernetes object. +func ResourceDeleteOp(apiServer *cke.Node, resourceKey string, resourceData []byte) cke.Operator { + return &resourceDeleteOp{ + apiserver: apiServer, + resourceKey: resourceKey, + resourceData: resourceData, + } +} + +func (o *resourceDeleteOp) Name() string { + return "resource-deletion" +} + +func (o *resourceDeleteOp) NextCommand() cke.Commander { + if o.finished { + return nil + } + o.finished = true + return o +} + +func (o *resourceDeleteOp) Targets() []string { + return []string{ + o.apiserver.Address, + } +} + +func (o *resourceDeleteOp) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { + cfg, err := inf.K8sConfig(ctx, o.apiserver) + if err != nil { + return err + } + dc, err := discovery.NewDiscoveryClientForConfig(cfg) + if err != nil { + return err + } + mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc)) + + dyn, err := dynamic.NewForConfig(cfg) + if err != nil { + return err + } + return cke.DeleteResource(ctx, dyn, mapper, inf, o.resourceKey, o.resourceData) +} + +func (o *resourceDeleteOp) Command() cke.Command { + return cke.Command{ + Name: "delete-resource", + Target: o.resourceKey, + } +} diff --git a/op/status.go b/op/status.go index 81d27f89e..5ffbc951e 100644 --- a/op/status.go +++ b/op/status.go @@ -439,6 +439,43 @@ func GetKubernetesClusterStatus(ctx context.Context, inf cke.Infrastructure, n * s.SetResourceStatus(res.Key, obj.GetAnnotations(), len(obj.GetManagedFields()) != 0) } + deletionResources, err := inf.Storage().GetAllDeletionResources(ctx) + if err != nil { + return cke.KubernetesClusterStatus{}, err + } + + s.ResourcesExistence = make(map[string]bool) + for key, val := range deletionResources { + + obj := &unstructured.Unstructured{} + _, gvk, err := decUnstructured.Decode(val, nil, obj) + if err != nil { + return cke.KubernetesClusterStatus{}, err + } + + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return cke.KubernetesClusterStatus{}, fmt.Errorf("failed to find rest mapping for %s: %w", gvk.String(), err) + } + + var dr dynamic.ResourceInterface + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + ns := obj.GetNamespace() + if ns == "" { + return cke.KubernetesClusterStatus{}, fmt.Errorf("no namespace for %s: name=%s", gvk.String(), obj.GetName()) + } + dr = dyn.Resource(mapping.Resource).Namespace(ns) + } else { + dr = dyn.Resource(mapping.Resource) + } + + _, err = dr.Get(ctx, obj.GetName(), metav1.GetOptions{}) + if err != nil && !k8serr.IsNotFound(err) { + return cke.KubernetesClusterStatus{}, err + } + s.ResourcesExistence[key] = !k8serr.IsNotFound(err) + } + return s, nil } diff --git a/resource.go b/resource.go index 04f4f159d..0752a486a 100644 --- a/resource.go +++ b/resource.go @@ -102,6 +102,41 @@ func ApplyResource(ctx context.Context, dynclient dynamic.Interface, mapper meta return err } +func DeleteResource(ctx context.Context, dynclient dynamic.Interface, mapper meta.RESTMapper, inf Infrastructure, key string, data []byte) error { + obj := &unstructured.Unstructured{} + _, gvk, err := decUnstructured.Decode(data, nil, obj) + if err != nil { + return fmt.Errorf("failed to decode data into *Unstructured: %w", err) + } + + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return fmt.Errorf("failed to find REST mapping for %s: %w", gvk.String(), err) + } + + var dr dynamic.ResourceInterface + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + dr = dynclient.Resource(mapping.Resource).Namespace(obj.GetNamespace()) + } else { + dr = dynclient.Resource(mapping.Resource) + } + if log.Enabled(log.LvDebug) { + log.Debug("resource-deletion", map[string]interface{}{ + "gvk": gvk.String(), + "gvr": mapping.Resource.String(), + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + }) + } + + err = dr.Delete(ctx, obj.GetName(), metav1.DeleteOptions{}) + if err != nil { + return err + } + err = inf.Storage().DeleteDeletionResource(ctx, key) + return err +} + func injectCA(ctx context.Context, st Storage, obj *unstructured.Unstructured, gvk *schema.GroupVersionKind) error { cacert, err := st.GetCACertificate(ctx, CAWebhook) if err != nil { diff --git a/server/control.go b/server/control.go index e216134b3..2c46ea66d 100644 --- a/server/control.go +++ b/server/control.go @@ -309,6 +309,10 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t if err != nil { return err } + dels, err := inf.Storage().GetAllDeletionResources(ctx) + if err != nil { + return err + } rqEntries, err := inf.Storage().GetRebootsEntries(ctx) if err != nil { @@ -338,7 +342,7 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t drainCompleted, drainTimedout, _ := op.CheckDrainCompletion(ctx, inf, nf.HealthyAPIServer(), cluster, rqEntries) rebootDequeued := op.CheckRebootDequeue(ctx, cluster, rqEntries) - ops, phase := DecideOps(cluster, status, constraints, rcs, DecideOpsRebootArgs{ + ops, phase := DecideOps(cluster, status, constraints, rcs, dels, DecideOpsRebootArgs{ RQEntries: rqEntries, NewlyDrained: newlyDrained, DrainCompleted: drainCompleted, diff --git a/server/strategy.go b/server/strategy.go index b7d2f1072..b032ac3b8 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -24,7 +24,7 @@ type DecideOpsRebootArgs struct { // DecideOps returns the next operations to do and the operation phase. // This returns nil when no operations need to be done. -func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, rebootArgs DecideOpsRebootArgs, config *Config) ([]cke.Operator, cke.OperationPhase) { +func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, deletionResources map[string][]byte, rebootArgs DecideOpsRebootArgs, config *Config) ([]cke.Operator, cke.OperationPhase) { nf := NewNodeFilter(c, cs) // 0. Execute upgrade operation if necessary @@ -77,7 +77,7 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain } // 7. Maintain k8s resources. - if ops := k8sMaintOps(c, cs, resources, rebootArgs.RQEntries, rebootArgs.NewlyDrained, nf); len(ops) > 0 { + if ops := k8sMaintOps(c, cs, resources, deletionResources, rebootArgs.RQEntries, rebootArgs.NewlyDrained, nf); len(ops) > 0 { return ops, cke.PhaseK8sMaintain } @@ -238,7 +238,7 @@ func etcdMaintOp(c *cke.Cluster, nf *NodeFilter) cke.Operator { return nil } -func k8sMaintOps(c *cke.Cluster, cs *cke.ClusterStatus, resources []cke.ResourceDefinition, rqEntries []*cke.RebootQueueEntry, newlyDrained []*cke.RebootQueueEntry, nf *NodeFilter) (ops []cke.Operator) { +func k8sMaintOps(c *cke.Cluster, cs *cke.ClusterStatus, resources []cke.ResourceDefinition, deletionResources map[string][]byte, rqEntries []*cke.RebootQueueEntry, newlyDrained []*cke.RebootQueueEntry, nf *NodeFilter) (ops []cke.Operator) { ks := cs.Kubernetes apiServer := nf.HealthyAPIServer() @@ -246,7 +246,7 @@ func k8sMaintOps(c *cke.Cluster, cs *cke.ClusterStatus, resources []cke.Resource return []cke.Operator{op.KubeWaitOp(apiServer)} } - ops = append(ops, decideResourceOps(apiServer, ks, resources, ks.IsReady(c))...) + ops = append(ops, decideResourceOps(apiServer, ks, resources, deletionResources, ks.IsReady(c))...) ops = append(ops, decideClusterDNSOps(apiServer, c, ks)...) @@ -611,7 +611,7 @@ func decideEtcdServiceOps(apiServer *cke.Node, svc *corev1.Service) cke.Operator return nil } -func decideResourceOps(apiServer *cke.Node, ks cke.KubernetesClusterStatus, resources []cke.ResourceDefinition, isReady bool) (ops []cke.Operator) { +func decideResourceOps(apiServer *cke.Node, ks cke.KubernetesClusterStatus, resources []cke.ResourceDefinition, deletionResources map[string][]byte, isReady bool) (ops []cke.Operator) { for _, res := range static.Resources { // To avoid thundering herd problem. Deployments need to be created only after enough nodes become ready. if res.Kind == cke.KindDeployment && !isReady { @@ -631,6 +631,12 @@ func decideResourceOps(apiServer *cke.Node, ks cke.KubernetesClusterStatus, reso ops = append(ops, op.ResourceApplyOp(apiServer, res, !status.HasBeenSSA)) } } + for key, value := range deletionResources { + if ks.ResourcesExistence[key] { + ops = append(ops, op.ResourceDeleteOp(apiServer, key, value)) + } + } + return ops } diff --git a/server/strategy_test.go b/server/strategy_test.go index e10dfe09d..dfed4eeca 100644 --- a/server/strategy_test.go +++ b/server/strategy_test.go @@ -58,11 +58,12 @@ var ( ) type testData struct { - Cluster *cke.Cluster - Status *cke.ClusterStatus - Constraints *cke.Constraints - Resources []cke.ResourceDefinition - RebootArgs DecideOpsRebootArgs + Cluster *cke.Cluster + Status *cke.ClusterStatus + Constraints *cke.Constraints + Resources []cke.ResourceDefinition + DeletionResources map[string][]byte + RebootArgs DecideOpsRebootArgs } func (d testData) ControlPlane() (nodes []*cke.Node) { @@ -2188,7 +2189,7 @@ func TestDecideOps(t *testing.T) { for _, c := range cases { t.Run(c.Name, func(t *testing.T) { - ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.RebootArgs, &Config{ + ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.DeletionResources, c.Input.RebootArgs, &Config{ Interval: 0, CertsGCInterval: 0, MaxConcurrentUpdates: 5, diff --git a/status.go b/status.go index 5415f0914..c156ae5d1 100644 --- a/status.go +++ b/status.go @@ -40,6 +40,7 @@ type KubernetesClusterStatus struct { EtcdEndpoints *corev1.Endpoints EtcdEndpointSlice *discoveryv1.EndpointSlice ResourceStatuses map[string]ResourceStatus + ResourcesExistence map[string]bool } // ResourceStatus represents the status of registered K8s resources diff --git a/storage.go b/storage.go index 4081a1743..37ef81652 100644 --- a/storage.go +++ b/storage.go @@ -24,26 +24,27 @@ type RecordChan <-chan *Record // etcd keys and prefixes const ( - KeyCA = "ca/" - KeyConfigVersion = "config-version" - KeyCluster = "cluster" - KeyClusterRevision = "cluster-revision" - KeyConstraints = "constraints" - KeyLeader = "leader/" - KeyRebootsDisabled = "reboots/disabled" - KeyRebootsPrefix = "reboots/data/" - KeyRebootsWriteIndex = "reboots/write-index" - KeyRecords = "records/" - KeyRecordID = "records" - KeyResourcePrefix = "resource/" - KeySabakanDisabled = "sabakan/disabled" - KeySabakanQueryVariables = "sabakan/query-variables" - KeySabakanTemplate = "sabakan/template" - KeySabakanURL = "sabakan/url" - KeyServiceAccountCert = "service-account/certificate" - KeyServiceAccountKey = "service-account/key" - KeyStatus = "status" - KeyVault = "vault" + KeyCA = "ca/" + KeyConfigVersion = "config-version" + KeyCluster = "cluster" + KeyClusterRevision = "cluster-revision" + KeyConstraints = "constraints" + KeyDeletionResourcePrefix = "deletion/" + KeyLeader = "leader/" + KeyRebootsDisabled = "reboots/disabled" + KeyRebootsPrefix = "reboots/data/" + KeyRebootsWriteIndex = "reboots/write-index" + KeyRecords = "records/" + KeyRecordID = "records" + KeyResourcePrefix = "resource/" + KeySabakanDisabled = "sabakan/disabled" + KeySabakanQueryVariables = "sabakan/query-variables" + KeySabakanTemplate = "sabakan/template" + KeySabakanURL = "sabakan/url" + KeyServiceAccountCert = "service-account/certificate" + KeyServiceAccountKey = "service-account/key" + KeyStatus = "status" + KeyVault = "vault" ) const maxRecords = 1000 @@ -583,9 +584,42 @@ func (s Storage) SetResource(ctx context.Context, key, value string) error { return err } -// DeleteResource removes a user resource from etcd. +// DeleteResource moves a user resource from the resource list to the deletion list. func (s Storage) DeleteResource(ctx context.Context, key string) error { - _, err := s.Delete(ctx, KeyResourcePrefix+key) + val, rev, err := s.GetResource(ctx, key) + if err != nil { + return err + } + _, err = s.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision(KeyResourcePrefix+key), "=", rev)). + Then( + clientv3.OpDelete(KeyResourcePrefix+key), + clientv3.OpPut(KeyDeletionResourcePrefix+key, string(val)), + ).Commit() + return err +} + +// GetAllDeletionResources gets all user-defined resources in the deletion list. +func (s Storage) GetAllDeletionResources(ctx context.Context) (map[string][]byte, error) { + resp, err := s.Get(ctx, KeyDeletionResourcePrefix, clientv3.WithPrefix()) + if err != nil { + return nil, err + } + if len(resp.Kvs) == 0 { + return nil, nil + } + + rcs := make(map[string][]byte, len(resp.Kvs)) + for _, kv := range resp.Kvs { + key := string(kv.Key[len(KeyResourcePrefix):]) + rcs[key] = kv.Value + + } + return rcs, nil +} + +func (s Storage) DeleteDeletionResource(ctx context.Context, key string) error { + _, err := s.Delete(ctx, KeyDeletionResourcePrefix+key) return err } @@ -609,17 +643,14 @@ RETRY: if value == string(cur.Definition) { continue } - ifOps = append(ifOps, clientv3.Compare(clientv3.ModRevision(KeyResourcePrefix+key), "=", cur.Revision)) thenOps = append(thenOps, clientv3.OpPut(KeyResourcePrefix+key, value)) - - if _, ok := currentResources[key]; ok { - delete(currentResources, key) - } + delete(currentResources, key) } - for key := range currentResources { + for key, val := range currentResources { ifOps = append(ifOps, clientv3util.KeyMissing(KeyResourcePrefix+key)) thenOps = append(thenOps, clientv3.OpDelete(KeyResourcePrefix+key)) + thenOps = append(thenOps, clientv3.OpPut(KeyDeletionResourcePrefix+key, string(val.Definition))) } txnResp, err := s.Txn(ctx).