diff --git a/pkg/ckecli/cmd/resource_set.go b/pkg/ckecli/cmd/resource_set.go index 2ea8aa585..825aff906 100644 --- a/pkg/ckecli/cmd/resource_set.go +++ b/pkg/ckecli/cmd/resource_set.go @@ -43,21 +43,23 @@ If FILE is "-", then data is read from stdin.`, } well.Go(func(ctx context.Context) error { + newResources := make(map[string]string) y := k8sYaml.NewYAMLReader(bufio.NewReader(r)) for { data, err := y.Read() if err == io.EOF { - return nil + break } if err != nil { return err } - - err = updateResource(ctx, data) + key, err := cke.ParseResource(data) if err != nil { return err } + newResources[key] = string(data) } + return storage.ReplaceResources(ctx, newResources) }) well.Stop() return well.Wait() diff --git a/storage.go b/storage.go index 5895b8a85..4081a1743 100644 --- a/storage.go +++ b/storage.go @@ -589,6 +589,53 @@ func (s Storage) DeleteResource(ctx context.Context, key string) error { return err } +// ReplaceResources replaces all user resources with new ones. +func (s Storage) ReplaceResources(ctx context.Context, newResources map[string]string) error { +RETRY: + current, err := s.GetAllResources(ctx) + if err != nil { + return err + } + currentResources := make(map[string]ResourceDefinition) + for _, r := range current { + currentResources[r.Key] = r + } + + var ifOps []clientv3.Cmp + var thenOps []clientv3.Op + + for key, value := range newResources { + cur := currentResources[key] + if value == string(cur.Definition) { + continue + } + + ifOps = append(ifOps, clientv3.Compare(clientv3.ModRevision(KeyResourcePrefix+key), "=", cur.Revision)) + thenOps = append(thenOps, clientv3.OpPut(KeyResourcePrefix+key, value)) + + if _, ok := currentResources[key]; ok { + delete(currentResources, key) + } + } + for key := range currentResources { + ifOps = append(ifOps, clientv3util.KeyMissing(KeyResourcePrefix+key)) + thenOps = append(thenOps, clientv3.OpDelete(KeyResourcePrefix+key)) + } + + txnResp, err := s.Txn(ctx). + If(ifOps...). + Then(thenOps...). + Commit() + + if err != nil { + return err + } + if !txnResp.Succeeded { + goto RETRY + } + return nil +} + // IsSabakanDisabled returns true if sabakan integration is disabled. func (s Storage) IsSabakanDisabled(ctx context.Context) (bool, error) { resp, err := s.Get(ctx, KeySabakanDisabled)