Skip to content

Commit

Permalink
Once a user-defined resource is deleted, it should be deleted from Ku…
Browse files Browse the repository at this point in the history
…bernetes cluster.
  • Loading branch information
zoetrope committed Sep 8, 2023
1 parent 8969260 commit 329ce31
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 40 deletions.
60 changes: 60 additions & 0 deletions op/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,63 @@ func (o *resourceApplyOp) Command() cke.Command {
Target: o.resource.String(),
}
}

type resourceDeleteOp struct {
apiserver *cke.Node
resourceKey string
resourceData []byte

finished bool
}

// ResourceDeleteOp creates or updates a Kubernetes object.
func ResourceDeleteOp(apiServer *cke.Node, resourceKey string, resourceData []byte) cke.Operator {
return &resourceDeleteOp{
apiserver: apiServer,
resourceKey: resourceKey,
resourceData: resourceData,
}
}

func (o *resourceDeleteOp) Name() string {
return "resource-deletion"
}

func (o *resourceDeleteOp) NextCommand() cke.Commander {
if o.finished {
return nil
}
o.finished = true
return o
}

func (o *resourceDeleteOp) Targets() []string {
return []string{
o.apiserver.Address,
}
}

func (o *resourceDeleteOp) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
cfg, err := inf.K8sConfig(ctx, o.apiserver)
if err != nil {
return err
}
dc, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return err
}
mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc))

dyn, err := dynamic.NewForConfig(cfg)
if err != nil {
return err
}
return cke.DeleteResource(ctx, dyn, mapper, inf, o.resourceKey, o.resourceData)
}

func (o *resourceDeleteOp) Command() cke.Command {
return cke.Command{
Name: "delete-resource",
Target: o.resourceKey,
}
}
37 changes: 37 additions & 0 deletions op/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,43 @@ func GetKubernetesClusterStatus(ctx context.Context, inf cke.Infrastructure, n *
s.SetResourceStatus(res.Key, obj.GetAnnotations(), len(obj.GetManagedFields()) != 0)
}

deletionResources, err := inf.Storage().GetAllDeletionResources(ctx)
if err != nil {
return cke.KubernetesClusterStatus{}, err
}

s.ResourcesExistence = make(map[string]bool)
for key, val := range deletionResources {

obj := &unstructured.Unstructured{}
_, gvk, err := decUnstructured.Decode(val, nil, obj)
if err != nil {
return cke.KubernetesClusterStatus{}, err
}

mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return cke.KubernetesClusterStatus{}, fmt.Errorf("failed to find rest mapping for %s: %w", gvk.String(), err)
}

var dr dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
ns := obj.GetNamespace()
if ns == "" {
return cke.KubernetesClusterStatus{}, fmt.Errorf("no namespace for %s: name=%s", gvk.String(), obj.GetName())
}
dr = dyn.Resource(mapping.Resource).Namespace(ns)
} else {
dr = dyn.Resource(mapping.Resource)
}

_, err = dr.Get(ctx, obj.GetName(), metav1.GetOptions{})
if err != nil && !k8serr.IsNotFound(err) {
return cke.KubernetesClusterStatus{}, err
}
s.ResourcesExistence[key] = !k8serr.IsNotFound(err)
}

return s, nil
}

Expand Down
35 changes: 35 additions & 0 deletions resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,41 @@ func ApplyResource(ctx context.Context, dynclient dynamic.Interface, mapper meta
return err
}

func DeleteResource(ctx context.Context, dynclient dynamic.Interface, mapper meta.RESTMapper, inf Infrastructure, key string, data []byte) error {
obj := &unstructured.Unstructured{}
_, gvk, err := decUnstructured.Decode(data, nil, obj)
if err != nil {
return fmt.Errorf("failed to decode data into *Unstructured: %w", err)
}

mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return fmt.Errorf("failed to find REST mapping for %s: %w", gvk.String(), err)
}

var dr dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dr = dynclient.Resource(mapping.Resource).Namespace(obj.GetNamespace())
} else {
dr = dynclient.Resource(mapping.Resource)
}
if log.Enabled(log.LvDebug) {
log.Debug("resource-deletion", map[string]interface{}{
"gvk": gvk.String(),
"gvr": mapping.Resource.String(),
"namespace": obj.GetNamespace(),
"name": obj.GetName(),
})
}

err = dr.Delete(ctx, obj.GetName(), metav1.DeleteOptions{})
if err != nil {
return err
}
err = inf.Storage().DeleteDeletionResource(ctx, key)
return err
}

func injectCA(ctx context.Context, st Storage, obj *unstructured.Unstructured, gvk *schema.GroupVersionKind) error {
cacert, err := st.GetCACertificate(ctx, CAWebhook)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
if err != nil {
return err
}
dels, err := inf.Storage().GetAllDeletionResources(ctx)
if err != nil {
return err
}

rqEntries, err := inf.Storage().GetRebootsEntries(ctx)
if err != nil {
Expand Down Expand Up @@ -338,7 +342,7 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
drainCompleted, drainTimedout, _ := op.CheckDrainCompletion(ctx, inf, nf.HealthyAPIServer(), cluster, rqEntries)
rebootDequeued := op.CheckRebootDequeue(ctx, cluster, rqEntries)

ops, phase := DecideOps(cluster, status, constraints, rcs, DecideOpsRebootArgs{
ops, phase := DecideOps(cluster, status, constraints, rcs, dels, DecideOpsRebootArgs{
RQEntries: rqEntries,
NewlyDrained: newlyDrained,
DrainCompleted: drainCompleted,
Expand Down
16 changes: 11 additions & 5 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type DecideOpsRebootArgs struct {

// DecideOps returns the next operations to do and the operation phase.
// This returns nil when no operations need to be done.
func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, rebootArgs DecideOpsRebootArgs, config *Config) ([]cke.Operator, cke.OperationPhase) {
func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, deletionResources map[string][]byte, rebootArgs DecideOpsRebootArgs, config *Config) ([]cke.Operator, cke.OperationPhase) {
nf := NewNodeFilter(c, cs)

// 0. Execute upgrade operation if necessary
Expand Down Expand Up @@ -77,7 +77,7 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
}

// 7. Maintain k8s resources.
if ops := k8sMaintOps(c, cs, resources, rebootArgs.RQEntries, rebootArgs.NewlyDrained, nf); len(ops) > 0 {
if ops := k8sMaintOps(c, cs, resources, deletionResources, rebootArgs.RQEntries, rebootArgs.NewlyDrained, nf); len(ops) > 0 {
return ops, cke.PhaseK8sMaintain
}

Expand Down Expand Up @@ -238,15 +238,15 @@ func etcdMaintOp(c *cke.Cluster, nf *NodeFilter) cke.Operator {
return nil
}

func k8sMaintOps(c *cke.Cluster, cs *cke.ClusterStatus, resources []cke.ResourceDefinition, rqEntries []*cke.RebootQueueEntry, newlyDrained []*cke.RebootQueueEntry, nf *NodeFilter) (ops []cke.Operator) {
func k8sMaintOps(c *cke.Cluster, cs *cke.ClusterStatus, resources []cke.ResourceDefinition, deletionResources map[string][]byte, rqEntries []*cke.RebootQueueEntry, newlyDrained []*cke.RebootQueueEntry, nf *NodeFilter) (ops []cke.Operator) {
ks := cs.Kubernetes
apiServer := nf.HealthyAPIServer()

if !ks.IsControlPlaneReady {
return []cke.Operator{op.KubeWaitOp(apiServer)}
}

ops = append(ops, decideResourceOps(apiServer, ks, resources, ks.IsReady(c))...)
ops = append(ops, decideResourceOps(apiServer, ks, resources, deletionResources, ks.IsReady(c))...)

ops = append(ops, decideClusterDNSOps(apiServer, c, ks)...)

Expand Down Expand Up @@ -611,7 +611,7 @@ func decideEtcdServiceOps(apiServer *cke.Node, svc *corev1.Service) cke.Operator
return nil
}

func decideResourceOps(apiServer *cke.Node, ks cke.KubernetesClusterStatus, resources []cke.ResourceDefinition, isReady bool) (ops []cke.Operator) {
func decideResourceOps(apiServer *cke.Node, ks cke.KubernetesClusterStatus, resources []cke.ResourceDefinition, deletionResources map[string][]byte, isReady bool) (ops []cke.Operator) {
for _, res := range static.Resources {
// To avoid thundering herd problem. Deployments need to be created only after enough nodes become ready.
if res.Kind == cke.KindDeployment && !isReady {
Expand All @@ -631,6 +631,12 @@ func decideResourceOps(apiServer *cke.Node, ks cke.KubernetesClusterStatus, reso
ops = append(ops, op.ResourceApplyOp(apiServer, res, !status.HasBeenSSA))
}
}
for key, value := range deletionResources {
if ks.ResourcesExistence[key] {
ops = append(ops, op.ResourceDeleteOp(apiServer, key, value))
}
}

return ops
}

Expand Down
13 changes: 7 additions & 6 deletions server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ var (
)

type testData struct {
Cluster *cke.Cluster
Status *cke.ClusterStatus
Constraints *cke.Constraints
Resources []cke.ResourceDefinition
RebootArgs DecideOpsRebootArgs
Cluster *cke.Cluster
Status *cke.ClusterStatus
Constraints *cke.Constraints
Resources []cke.ResourceDefinition
DeletionResources map[string][]byte
RebootArgs DecideOpsRebootArgs
}

func (d testData) ControlPlane() (nodes []*cke.Node) {
Expand Down Expand Up @@ -2188,7 +2189,7 @@ func TestDecideOps(t *testing.T) {

for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.RebootArgs, &Config{
ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.DeletionResources, c.Input.RebootArgs, &Config{
Interval: 0,
CertsGCInterval: 0,
MaxConcurrentUpdates: 5,
Expand Down
1 change: 1 addition & 0 deletions status.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type KubernetesClusterStatus struct {
EtcdEndpoints *corev1.Endpoints
EtcdEndpointSlice *discoveryv1.EndpointSlice
ResourceStatuses map[string]ResourceStatus
ResourcesExistence map[string]bool
}

// ResourceStatus represents the status of registered K8s resources
Expand Down
87 changes: 59 additions & 28 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,27 @@ type RecordChan <-chan *Record

// etcd keys and prefixes
const (
KeyCA = "ca/"
KeyConfigVersion = "config-version"
KeyCluster = "cluster"
KeyClusterRevision = "cluster-revision"
KeyConstraints = "constraints"
KeyLeader = "leader/"
KeyRebootsDisabled = "reboots/disabled"
KeyRebootsPrefix = "reboots/data/"
KeyRebootsWriteIndex = "reboots/write-index"
KeyRecords = "records/"
KeyRecordID = "records"
KeyResourcePrefix = "resource/"
KeySabakanDisabled = "sabakan/disabled"
KeySabakanQueryVariables = "sabakan/query-variables"
KeySabakanTemplate = "sabakan/template"
KeySabakanURL = "sabakan/url"
KeyServiceAccountCert = "service-account/certificate"
KeyServiceAccountKey = "service-account/key"
KeyStatus = "status"
KeyVault = "vault"
KeyCA = "ca/"
KeyConfigVersion = "config-version"
KeyCluster = "cluster"
KeyClusterRevision = "cluster-revision"
KeyConstraints = "constraints"
KeyDeletionResourcePrefix = "deletion/"
KeyLeader = "leader/"
KeyRebootsDisabled = "reboots/disabled"
KeyRebootsPrefix = "reboots/data/"
KeyRebootsWriteIndex = "reboots/write-index"
KeyRecords = "records/"
KeyRecordID = "records"
KeyResourcePrefix = "resource/"
KeySabakanDisabled = "sabakan/disabled"
KeySabakanQueryVariables = "sabakan/query-variables"
KeySabakanTemplate = "sabakan/template"
KeySabakanURL = "sabakan/url"
KeyServiceAccountCert = "service-account/certificate"
KeyServiceAccountKey = "service-account/key"
KeyStatus = "status"
KeyVault = "vault"
)

const maxRecords = 1000
Expand Down Expand Up @@ -583,9 +584,42 @@ func (s Storage) SetResource(ctx context.Context, key, value string) error {
return err
}

// DeleteResource removes a user resource from etcd.
// DeleteResource moves a user resource from the resource list to the deletion list.
func (s Storage) DeleteResource(ctx context.Context, key string) error {
_, err := s.Delete(ctx, KeyResourcePrefix+key)
val, rev, err := s.GetResource(ctx, key)
if err != nil {
return err
}
_, err = s.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(KeyResourcePrefix+key), "=", rev)).
Then(
clientv3.OpDelete(KeyResourcePrefix+key),
clientv3.OpPut(KeyDeletionResourcePrefix+key, string(val)),
).Commit()
return err
}

// GetAllDeletionResources gets all user-defined resources in the deletion list.
func (s Storage) GetAllDeletionResources(ctx context.Context) (map[string][]byte, error) {
resp, err := s.Get(ctx, KeyDeletionResourcePrefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return nil, nil
}

rcs := make(map[string][]byte, len(resp.Kvs))
for _, kv := range resp.Kvs {
key := string(kv.Key[len(KeyResourcePrefix):])
rcs[key] = kv.Value

}
return rcs, nil
}

func (s Storage) DeleteDeletionResource(ctx context.Context, key string) error {
_, err := s.Delete(ctx, KeyDeletionResourcePrefix+key)
return err
}

Expand All @@ -609,17 +643,14 @@ RETRY:
if value == string(cur.Definition) {
continue
}

ifOps = append(ifOps, clientv3.Compare(clientv3.ModRevision(KeyResourcePrefix+key), "=", cur.Revision))
thenOps = append(thenOps, clientv3.OpPut(KeyResourcePrefix+key, value))

if _, ok := currentResources[key]; ok {
delete(currentResources, key)
}
delete(currentResources, key)
}
for key := range currentResources {
for key, val := range currentResources {
ifOps = append(ifOps, clientv3util.KeyMissing(KeyResourcePrefix+key))
thenOps = append(thenOps, clientv3.OpDelete(KeyResourcePrefix+key))
thenOps = append(thenOps, clientv3.OpPut(KeyDeletionResourcePrefix+key, string(val.Definition)))
}

txnResp, err := s.Txn(ctx).
Expand Down

0 comments on commit 329ce31

Please sign in to comment.