Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove user resources with ckecli resource set sub-command. #664

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions op/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,63 @@ func (o *resourceApplyOp) Command() cke.Command {
Target: o.resource.String(),
}
}

type resourceDeleteOp struct {
apiserver *cke.Node
resourceKey string
resourceData []byte

finished bool
}

// ResourceDeleteOp creates or updates a Kubernetes object.
func ResourceDeleteOp(apiServer *cke.Node, resourceKey string, resourceData []byte) cke.Operator {
return &resourceDeleteOp{
apiserver: apiServer,
resourceKey: resourceKey,
resourceData: resourceData,
}
}

func (o *resourceDeleteOp) Name() string {
return "resource-deletion"
}

func (o *resourceDeleteOp) NextCommand() cke.Commander {
if o.finished {
return nil
}
o.finished = true
return o
}

func (o *resourceDeleteOp) Targets() []string {
return []string{
o.apiserver.Address,
}
}

func (o *resourceDeleteOp) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
cfg, err := inf.K8sConfig(ctx, o.apiserver)
if err != nil {
return err
}
dc, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return err
}
mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc))

dyn, err := dynamic.NewForConfig(cfg)
if err != nil {
return err
}
return cke.DeleteResource(ctx, dyn, mapper, inf, o.resourceKey, o.resourceData)
}

func (o *resourceDeleteOp) Command() cke.Command {
return cke.Command{
Name: "delete-resource",
Target: o.resourceKey,
}
}
37 changes: 37 additions & 0 deletions op/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,43 @@ func GetKubernetesClusterStatus(ctx context.Context, inf cke.Infrastructure, n *
s.SetResourceStatus(res.Key, obj.GetAnnotations(), len(obj.GetManagedFields()) != 0)
}

deletionResources, err := inf.Storage().GetAllDeletionResources(ctx)
if err != nil {
return cke.KubernetesClusterStatus{}, err
}

s.ResourcesExistence = make(map[string]bool)
for key, val := range deletionResources {

obj := &unstructured.Unstructured{}
_, gvk, err := decUnstructured.Decode(val, nil, obj)
if err != nil {
return cke.KubernetesClusterStatus{}, err
}

mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return cke.KubernetesClusterStatus{}, fmt.Errorf("failed to find rest mapping for %s: %w", gvk.String(), err)
}

var dr dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
ns := obj.GetNamespace()
if ns == "" {
return cke.KubernetesClusterStatus{}, fmt.Errorf("no namespace for %s: name=%s", gvk.String(), obj.GetName())
}
dr = dyn.Resource(mapping.Resource).Namespace(ns)
} else {
dr = dyn.Resource(mapping.Resource)
}

_, err = dr.Get(ctx, obj.GetName(), metav1.GetOptions{})
if err != nil && !k8serr.IsNotFound(err) {
return cke.KubernetesClusterStatus{}, err
}
s.ResourcesExistence[key] = !k8serr.IsNotFound(err)
}

return s, nil
}

Expand Down
22 changes: 9 additions & 13 deletions pkg/ckecli/cmd/resource_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,16 @@ import (
k8sYaml "k8s.io/apimachinery/pkg/util/yaml"
)

func updateResource(ctx context.Context, data []byte) error {
key, err := cke.ParseResource(data)
if err != nil {
return err
}

return storage.SetResource(ctx, key, string(data))
}

var resourceSetCmd = &cobra.Command{
Use: "set FILE",
Short: "register user-defined resources.",
Long: `Register user-defined resources.

FILE should contain multiple Kubernetes resources in YAML or JSON format.
If FILE is "-", then data is read from stdin.`,
If FILE is "-", then data is read from stdin.

If a resource with the same key as a registered resource is specified, the resource will be overwritten.
If a resource exists in a registered resource but not in the specified resource, the resource will be deleted.`,

Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -43,21 +37,23 @@ If FILE is "-", then data is read from stdin.`,
}

well.Go(func(ctx context.Context) error {
newResources := make(map[string]string)
y := k8sYaml.NewYAMLReader(bufio.NewReader(r))
for {
data, err := y.Read()
if err == io.EOF {
return nil
break
}
if err != nil {
return err
}

err = updateResource(ctx, data)
key, err := cke.ParseResource(data)
if err != nil {
return err
}
newResources[key] = string(data)
}
return storage.ReplaceResources(ctx, newResources)
})
well.Stop()
return well.Wait()
Expand Down
35 changes: 35 additions & 0 deletions resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,41 @@ func ApplyResource(ctx context.Context, dynclient dynamic.Interface, mapper meta
return err
}

func DeleteResource(ctx context.Context, dynclient dynamic.Interface, mapper meta.RESTMapper, inf Infrastructure, key string, data []byte) error {
obj := &unstructured.Unstructured{}
_, gvk, err := decUnstructured.Decode(data, nil, obj)
if err != nil {
return fmt.Errorf("failed to decode data into *Unstructured: %w", err)
}

mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return fmt.Errorf("failed to find REST mapping for %s: %w", gvk.String(), err)
}

var dr dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dr = dynclient.Resource(mapping.Resource).Namespace(obj.GetNamespace())
} else {
dr = dynclient.Resource(mapping.Resource)
}
if log.Enabled(log.LvDebug) {
log.Debug("resource-deletion", map[string]interface{}{
"gvk": gvk.String(),
"gvr": mapping.Resource.String(),
"namespace": obj.GetNamespace(),
"name": obj.GetName(),
})
}

err = dr.Delete(ctx, obj.GetName(), metav1.DeleteOptions{})
if err != nil {
return err
}
err = inf.Storage().DeleteDeletionResource(ctx, key)
return err
}

func injectCA(ctx context.Context, st Storage, obj *unstructured.Unstructured, gvk *schema.GroupVersionKind) error {
cacert, err := st.GetCACertificate(ctx, CAWebhook)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
if err != nil {
return err
}
dels, err := inf.Storage().GetAllDeletionResources(ctx)
if err != nil {
return err
}

rqEntries, err := inf.Storage().GetRebootsEntries(ctx)
if err != nil {
Expand Down Expand Up @@ -338,7 +342,7 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
drainCompleted, drainTimedout, _ := op.CheckDrainCompletion(ctx, inf, nf.HealthyAPIServer(), cluster, rqEntries)
rebootDequeued := op.CheckRebootDequeue(ctx, cluster, rqEntries)

ops, phase := DecideOps(cluster, status, constraints, rcs, DecideOpsRebootArgs{
ops, phase := DecideOps(cluster, status, constraints, rcs, dels, DecideOpsRebootArgs{
RQEntries: rqEntries,
NewlyDrained: newlyDrained,
DrainCompleted: drainCompleted,
Expand Down
16 changes: 11 additions & 5 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type DecideOpsRebootArgs struct {

// DecideOps returns the next operations to do and the operation phase.
// This returns nil when no operations need to be done.
func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, rebootArgs DecideOpsRebootArgs, config *Config) ([]cke.Operator, cke.OperationPhase) {
func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, resources []cke.ResourceDefinition, deletionResources map[string][]byte, rebootArgs DecideOpsRebootArgs, config *Config) ([]cke.Operator, cke.OperationPhase) {
nf := NewNodeFilter(c, cs)

// 0. Execute upgrade operation if necessary
Expand Down Expand Up @@ -77,7 +77,7 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
}

// 7. Maintain k8s resources.
if ops := k8sMaintOps(c, cs, resources, rebootArgs.RQEntries, rebootArgs.NewlyDrained, nf); len(ops) > 0 {
if ops := k8sMaintOps(c, cs, resources, deletionResources, rebootArgs.RQEntries, rebootArgs.NewlyDrained, nf); len(ops) > 0 {
return ops, cke.PhaseK8sMaintain
}

Expand Down Expand Up @@ -238,15 +238,15 @@ func etcdMaintOp(c *cke.Cluster, nf *NodeFilter) cke.Operator {
return nil
}

func k8sMaintOps(c *cke.Cluster, cs *cke.ClusterStatus, resources []cke.ResourceDefinition, rqEntries []*cke.RebootQueueEntry, newlyDrained []*cke.RebootQueueEntry, nf *NodeFilter) (ops []cke.Operator) {
func k8sMaintOps(c *cke.Cluster, cs *cke.ClusterStatus, resources []cke.ResourceDefinition, deletionResources map[string][]byte, rqEntries []*cke.RebootQueueEntry, newlyDrained []*cke.RebootQueueEntry, nf *NodeFilter) (ops []cke.Operator) {
ks := cs.Kubernetes
apiServer := nf.HealthyAPIServer()

if !ks.IsControlPlaneReady {
return []cke.Operator{op.KubeWaitOp(apiServer)}
}

ops = append(ops, decideResourceOps(apiServer, ks, resources, ks.IsReady(c))...)
ops = append(ops, decideResourceOps(apiServer, ks, resources, deletionResources, ks.IsReady(c))...)

ops = append(ops, decideClusterDNSOps(apiServer, c, ks)...)

Expand Down Expand Up @@ -611,7 +611,7 @@ func decideEtcdServiceOps(apiServer *cke.Node, svc *corev1.Service) cke.Operator
return nil
}

func decideResourceOps(apiServer *cke.Node, ks cke.KubernetesClusterStatus, resources []cke.ResourceDefinition, isReady bool) (ops []cke.Operator) {
func decideResourceOps(apiServer *cke.Node, ks cke.KubernetesClusterStatus, resources []cke.ResourceDefinition, deletionResources map[string][]byte, isReady bool) (ops []cke.Operator) {
for _, res := range static.Resources {
// To avoid thundering herd problem. Deployments need to be created only after enough nodes become ready.
if res.Kind == cke.KindDeployment && !isReady {
Expand All @@ -631,6 +631,12 @@ func decideResourceOps(apiServer *cke.Node, ks cke.KubernetesClusterStatus, reso
ops = append(ops, op.ResourceApplyOp(apiServer, res, !status.HasBeenSSA))
}
}
for key, value := range deletionResources {
if ks.ResourcesExistence[key] {
ops = append(ops, op.ResourceDeleteOp(apiServer, key, value))
}
}

return ops
}

Expand Down
13 changes: 7 additions & 6 deletions server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ var (
)

type testData struct {
Cluster *cke.Cluster
Status *cke.ClusterStatus
Constraints *cke.Constraints
Resources []cke.ResourceDefinition
RebootArgs DecideOpsRebootArgs
Cluster *cke.Cluster
Status *cke.ClusterStatus
Constraints *cke.Constraints
Resources []cke.ResourceDefinition
DeletionResources map[string][]byte
RebootArgs DecideOpsRebootArgs
}

func (d testData) ControlPlane() (nodes []*cke.Node) {
Expand Down Expand Up @@ -2188,7 +2189,7 @@ func TestDecideOps(t *testing.T) {

for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.RebootArgs, &Config{
ops, _ := DecideOps(c.Input.Cluster, c.Input.Status, c.Input.Constraints, c.Input.Resources, c.Input.DeletionResources, c.Input.RebootArgs, &Config{
Interval: 0,
CertsGCInterval: 0,
MaxConcurrentUpdates: 5,
Expand Down
1 change: 1 addition & 0 deletions status.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type KubernetesClusterStatus struct {
EtcdEndpoints *corev1.Endpoints
EtcdEndpointSlice *discoveryv1.EndpointSlice
ResourceStatuses map[string]ResourceStatus
ResourcesExistence map[string]bool
}

// ResourceStatus represents the status of registered K8s resources
Expand Down
Loading
Loading