From d5654af625266b275e2670e4e1bebe2c9a4d02f1 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 25 Jun 2024 11:47:51 +0800 Subject: [PATCH] support listing works with its labels (#135) Signed-off-by: Wei Liu --- go.mod | 2 +- go.sum | 4 +- pkg/api/resource_types.go | 44 +++- pkg/client/cloudevents/grpcsource/util.go | 84 +++++++ .../cloudevents/grpcsource/util_test.go | 76 ++++++ pkg/client/cloudevents/grpcsource/watch.go | 18 +- .../cloudevents/grpcsource/watcherstore.go | 45 +++- test/e2e/pkg/sourceclient_test.go | 230 +++++++++++++++++- 8 files changed, 468 insertions(+), 35 deletions(-) diff --git a/go.mod b/go.mod index ec12e08a..36166454 100755 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( k8s.io/klog/v2 v2.120.1 open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 - open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd + open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2 ) require ( diff --git a/go.sum b/go.sum index 5681fe25..3075cfab 100755 --- a/go.sum +++ b/go.sum @@ -825,8 +825,8 @@ open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZ open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0= open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x25QZIzfZqeSFfZdNrzc2hlHm6t/JKYKu9fI= open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4= -open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd h1:kTVZOR7bTdh4ID7EoliyGhPR5CItpx8GehN581IxoPA= -open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= +open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2 h1:HOvmoJ1CZF26EDkf4t53ZYztaSjM1LBFk1JuHTIffHU= +open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y= sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk= diff --git a/pkg/api/resource_types.go b/pkg/api/resource_types.go index 165cba0f..4b1d8e83 100755 --- a/pkg/api/resource_types.go +++ b/pkg/api/resource_types.go @@ -16,6 +16,7 @@ import ( workv1 "open-cluster-management.io/api/work/v1" cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" ) type ResourceType string @@ -93,9 +94,35 @@ type ResourcePatchRequest struct{} // JSONMAPToCloudEvent converts a JSONMap (resource manifest or status) to a CloudEvent func JSONMAPToCloudEvent(res datatypes.JSONMap) (*cloudevents.Event, error) { - resJSON, err := res.MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err) + var err error + var resJSON []byte + + if metadata, ok := res[codec.ExtensionWorkMeta]; ok { + // cloudevents require its extension value as string, so we need convert the metadata object + // to string back + + // ensure the original resource will be not changed + resCopy := datatypes.JSONMap{} + for key, value := range res { + resCopy[key] = value + } + + metaJson, err := json.Marshal(metadata) + if err != nil { + return nil, err + } + + resCopy[codec.ExtensionWorkMeta] = string(metaJson) + + resJSON, err = resCopy.MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err) + } + } else { + resJSON, err = res.MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err) + } } evt := &cloudevents.Event{} @@ -118,6 +145,17 @@ func CloudEventToJSONMap(evt *cloudevents.Event) (datatypes.JSONMap, error) { return nil, fmt.Errorf("failed to unmarshal cloudevent JSON to JSONMAP: %v", err) } + if metadata, ok := res[codec.ExtensionWorkMeta]; ok { + // cloudevents treat its extension value as string, so we need convert metadata extension + // to an object for supporting to query the resources with metadata + objectMeta := map[string]any{} + + if err := json.Unmarshal([]byte(fmt.Sprintf("%s", metadata)), &objectMeta); err != nil { + return nil, err + } + res[codec.ExtensionWorkMeta] = objectMeta + } + return res, nil } diff --git a/pkg/client/cloudevents/grpcsource/util.go b/pkg/client/cloudevents/grpcsource/util.go index 966e701c..dd3ce696 100644 --- a/pkg/client/cloudevents/grpcsource/util.go +++ b/pkg/client/cloudevents/grpcsource/util.go @@ -2,15 +2,21 @@ package grpcsource import ( "encoding/json" + "fmt" + "strings" "github.com/openshift-online/maestro/pkg/api/openapi" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" ) +const jsonbPrefix = `payload->'metadata'->'labels'` + // ToManifestWork converts an openapi.ResourceBundle object to workv1.ManifestWork object func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) { work := &workv1.ManifestWork{} @@ -86,6 +92,84 @@ func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) { return work, nil } +func ToLabelSearch(opts metav1.ListOptions) (labels.Selector, string, bool, error) { + if len(opts.LabelSelector) == 0 { + return labels.Everything(), "", false, nil + } + + labelSelector, err := labels.Parse(opts.LabelSelector) + if err != nil { + return nil, "", false, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err) + } + + requirements, selectable := labelSelector.Requirements() + if !selectable { + return labels.Everything(), "", false, nil + } + + equalsLabels := []string{} + notEqualsLabels := []string{} + + existsKeys := []string{} + doesNotExistKeys := []string{} + + inLabels := map[string][]string{} + + // refer to below links to find how to use the label selector in kubernetes + // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#equality-based-requirement + // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#set-based-requirement + for _, requirement := range requirements { + switch requirement.Operator() { + case selection.Equals, selection.DoubleEquals: + values := requirement.Values() + if len(values) != 1 { + return nil, "", false, fmt.Errorf("too many values in equals operation") + } + + equalsLabels = append(equalsLabels, fmt.Sprintf(`"%s":"%s"`, requirement.Key(), values.List()[0])) + case selection.NotEquals: + values := requirement.Values() + if len(values) != 1 { + return nil, "", false, fmt.Errorf("too many values in not equals operation") + } + + notEqualsLabels = append(notEqualsLabels, fmt.Sprintf(`%s->>'%s'<>'%s'`, jsonbPrefix, requirement.Key(), values.List()[0])) + case selection.Exists: + existsKeys = append(existsKeys, fmt.Sprintf(`%s->>'%s'<>null`, jsonbPrefix, requirement.Key())) + case selection.In: + vals := []string{} + for val := range requirement.Values() { + vals = append(vals, fmt.Sprintf("'%s'", val)) + } + + inLabels[requirement.Key()] = vals + case selection.NotIn: + for val := range requirement.Values() { + notEqualsLabels = append(notEqualsLabels, fmt.Sprintf(`%s->>'%s'<>'%s'`, jsonbPrefix, requirement.Key(), val)) + } + default: + // only DoesNotExist cannot be supported + return nil, "", false, fmt.Errorf("unsupported operator %s", requirement.Operator()) + } + } + + labelSearch := []string{} + if len(equalsLabels) != 0 { + labelSearch = append(labelSearch, fmt.Sprintf(`%s@>'{%s}'`, jsonbPrefix, strings.Join(equalsLabels, ","))) + } + + if len(inLabels) != 0 { + for key, vals := range inLabels { + labelSearch = append(labelSearch, fmt.Sprintf(`%s->>'%s'in(%s)`, jsonbPrefix, key, strings.Join(vals, ","))) + } + } + + labelSearch = append(labelSearch, notEqualsLabels...) + labelSearch = append(labelSearch, existsKeys...) + labelSearch = append(labelSearch, doesNotExistKeys...) + return labelSelector, strings.Join(labelSearch, " and "), true, nil +} + func marshal(obj map[string]any) ([]byte, error) { unstructuredObj := unstructured.Unstructured{Object: obj} data, err := unstructuredObj.MarshalJSON() diff --git a/pkg/client/cloudevents/grpcsource/util_test.go b/pkg/client/cloudevents/grpcsource/util_test.go index a4fd4f7b..7e333d3c 100644 --- a/pkg/client/cloudevents/grpcsource/util_test.go +++ b/pkg/client/cloudevents/grpcsource/util_test.go @@ -6,6 +6,7 @@ import ( "github.com/openshift-online/maestro/pkg/api/openapi" "k8s.io/apimachinery/pkg/api/equality" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" workv1 "open-cluster-management.io/api/work/v1" @@ -130,3 +131,78 @@ func TestToManifestWork(t *testing.T) { }) } } + +func TestToLabelSearch(t *testing.T) { + cases := []struct { + name string + opts v1.ListOptions + expectedSelectable bool + expectedLabelSearch string + }{ + { + name: "no label selector", + opts: v1.ListOptions{}, + expectedSelectable: false, + expectedLabelSearch: "", + }, + { + name: "selector everything", + opts: v1.ListOptions{LabelSelector: labels.Everything().String()}, + expectedSelectable: false, + expectedLabelSearch: "", + }, + { + name: "one equals selector", + opts: v1.ListOptions{LabelSelector: "a=b"}, + expectedSelectable: true, + expectedLabelSearch: `payload->'metadata'->'labels'@>'{"a":"b"}'`, + }, + { + name: "multiple equals selector", + opts: v1.ListOptions{LabelSelector: "a=b,c==d"}, + expectedSelectable: true, + expectedLabelSearch: `payload->'metadata'->'labels'@>'{"a":"b","c":"d"}'`, + }, + { + name: "multiple not equals selector", + opts: v1.ListOptions{LabelSelector: "a!=b,c!=d"}, + expectedSelectable: true, + expectedLabelSearch: `payload->'metadata'->'labels'->>'a'<>'b' and payload->'metadata'->'labels'->>'c'<>'d'`, + }, + { + name: "exist selector", + opts: v1.ListOptions{LabelSelector: "a"}, + expectedSelectable: true, + expectedLabelSearch: `payload->'metadata'->'labels'->>'a'<>null`, + }, + { + name: "in selector", + opts: v1.ListOptions{LabelSelector: "env in (a)"}, + expectedSelectable: true, + expectedLabelSearch: `payload->'metadata'->'labels'->>'env'in('a')`, + }, + { + name: "not in selector", + opts: v1.ListOptions{LabelSelector: "env notin (a)"}, + expectedSelectable: true, + expectedLabelSearch: `payload->'metadata'->'labels'->>'env'<>'a'`, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + _, labelSearch, selectable, err := ToLabelSearch(c.opts) + if c.expectedSelectable != selectable { + t.Errorf("expected %v, but got %v", c.expectedSelectable, selectable) + } + + if c.expectedLabelSearch != labelSearch { + t.Errorf("expected %s, but got %s", c.expectedLabelSearch, labelSearch) + } + + if err != nil { + t.Errorf("expected error %v", err) + } + }) + } +} diff --git a/pkg/client/cloudevents/grpcsource/watch.go b/pkg/client/cloudevents/grpcsource/watch.go index b704cb22..1b03b1ea 100644 --- a/pkg/client/cloudevents/grpcsource/watch.go +++ b/pkg/client/cloudevents/grpcsource/watch.go @@ -4,6 +4,7 @@ import ( "sync" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" workv1 "open-cluster-management.io/api/work/v1" @@ -17,16 +18,18 @@ type workWatcher struct { done chan struct{} stopped bool - namespace string + namespace string + labelSelector labels.Selector } var _ watch.Interface = &workWatcher{} -func newWorkWatcher(namespace string) *workWatcher { +func newWorkWatcher(namespace string, labelSelector labels.Selector) *workWatcher { return &workWatcher{ - result: make(chan watch.Event), - done: make(chan struct{}), - namespace: namespace, + result: make(chan watch.Event), + done: make(chan struct{}), + namespace: namespace, + labelSelector: labelSelector, } } @@ -68,6 +71,11 @@ func (w *workWatcher) Receive(evt watch.Event) { return } + if !w.labelSelector.Matches(labels.Set(work.GetLabels())) { + klog.V(4).Infof("ignore the label unmatched work %s/%s for the watcher %s", work.Namespace, work.Name, w.namespace) + return + } + w.result <- evt } diff --git a/pkg/client/cloudevents/grpcsource/watcherstore.go b/pkg/client/cloudevents/grpcsource/watcherstore.go index 262bb75b..f0185875 100644 --- a/pkg/client/cloudevents/grpcsource/watcherstore.go +++ b/pkg/client/cloudevents/grpcsource/watcherstore.go @@ -13,6 +13,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" @@ -68,13 +69,22 @@ func newRESTFulAPIWatcherStore(ctx context.Context, apiClient *openapi.APIClient // Using `metav1.NamespaceAll` to specify all namespaces. func (m *RESTFulAPIWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) { // Only list works from maestro server with the given namespace when a watcher is required - search := []string{fmt.Sprintf("source = '%s'", m.sourceID)} + labelSelector, labelSearch, selectable, err := ToLabelSearch(opts) + if err != nil { + return nil, err + } + + searches := []string{fmt.Sprintf("source='%s'", m.sourceID)} if namespace != metav1.NamespaceAll { - search = append(search, fmt.Sprintf("consumer_name = '%s'", namespace)) + searches = append(searches, fmt.Sprintf("consumer_name='%s'", namespace)) + } + + if selectable { + searches = append(searches, labelSearch) } rbs, _, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()). - Search(strings.Join(search, " and ")). + Search(strings.Join(searches, " and ")). Page(1). Size(-1). Execute() @@ -82,7 +92,7 @@ func (m *RESTFulAPIWatcherStore) GetWatcher(namespace string, opts metav1.ListOp return nil, err } - watcher := m.registerWatcher(namespace) + watcher := m.registerWatcher(namespace, labelSelector) // save the works to a queue for _, rb := range rbs.Items { @@ -143,14 +153,25 @@ func (m *RESTFulAPIWatcherStore) List(namespace string, opts metav1.ListOptions) size = int32(opts.Limit) } - // TODO filter works by labels - search := []string{fmt.Sprintf("source = '%s'", m.sourceID)} + _, labelSearch, selectable, err := ToLabelSearch(opts) + if err != nil { + return nil, err + } + + searches := []string{fmt.Sprintf("source='%s'", m.sourceID)} if namespace != metav1.NamespaceAll { - search = append(search, fmt.Sprintf("consumer_name = '%s'", namespace)) + searches = append(searches, fmt.Sprintf("consumer_name='%s'", namespace)) } + if selectable { + searches = append(searches, labelSearch) + } + + search := strings.Join(searches, " and ") + klog.V(4).Infof("list works with search=%s", search) + rbs, _, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()). - Search(strings.Join(search, " and ")). + Search(search). Page(page). Size(size). Execute() @@ -210,10 +231,10 @@ func (m *RESTFulAPIWatcherStore) Sync() error { break } - namespaces = append(namespaces, fmt.Sprintf("consumer_name = '%s'", namespace)) + namespaces = append(namespaces, fmt.Sprintf("consumer_name='%s'", namespace)) } - search := []string{fmt.Sprintf("source = '%s'", m.sourceID)} + search := []string{fmt.Sprintf("source='%s'", m.sourceID)} if !hasAll { search = append(search, namespaces...) } @@ -273,7 +294,7 @@ func (m *RESTFulAPIWatcherStore) process() { } } -func (m *RESTFulAPIWatcherStore) registerWatcher(namespace string) watch.Interface { +func (m *RESTFulAPIWatcherStore) registerWatcher(namespace string, labelSelector labels.Selector) watch.Interface { m.Lock() defer m.Unlock() @@ -282,7 +303,7 @@ func (m *RESTFulAPIWatcherStore) registerWatcher(namespace string) watch.Interfa return watcher } - watcher = newWorkWatcher(namespace) + watcher = newWorkWatcher(namespace, labelSelector) m.watchers[namespace] = watcher return watcher } diff --git a/test/e2e/pkg/sourceclient_test.go b/test/e2e/pkg/sourceclient_test.go index 5e45689a..8c22d0ba 100644 --- a/test/e2e/pkg/sourceclient_test.go +++ b/test/e2e/pkg/sourceclient_test.go @@ -12,12 +12,14 @@ import ( . "github.com/onsi/gomega" "github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" workv1 "open-cluster-management.io/api/work/v1" @@ -30,36 +32,38 @@ var _ = Describe("gRPC Source ManifestWork Client Test", func() { var watcherCtx context.Context var watcherCancel context.CancelFunc - var firstInitWorkName string - var secondInitWorkName string + var initWorkAName string + var initWorkBName string BeforeEach(func() { watcherCtx, watcherCancel = context.WithCancel(context.Background()) // prepare two works firstly - firstInitWorkName = "first-init-work-" + rand.String(5) - secondInitWorkName = "second-init-work-" + rand.String(5) + initWorkAName = "init-work-a-" + rand.String(5) + work := NewManifestWorkWithLabels(initWorkAName, map[string]string{"app": "test"}) - _, err := workClient.ManifestWorks(consumer_name).Create(ctx, NewManifestWork(firstInitWorkName), metav1.CreateOptions{}) + _, err := workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) - _, err = workClient.ManifestWorks(consumer_name).Create(ctx, NewManifestWork(secondInitWorkName), metav1.CreateOptions{}) + initWorkBName = "init-work-b" + rand.String(5) + work = NewManifestWorkWithLabels(initWorkBName, map[string]string{"app": "test"}) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) }) AfterEach(func() { - err := workClient.ManifestWorks(consumer_name).Delete(ctx, firstInitWorkName, metav1.DeleteOptions{}) + err := workClient.ManifestWorks(consumer_name).Delete(ctx, initWorkAName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) - err = workClient.ManifestWorks(consumer_name).Delete(ctx, secondInitWorkName, metav1.DeleteOptions{}) + err = workClient.ManifestWorks(consumer_name).Delete(ctx, initWorkBName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { - if err := AssertWorkNotFound(firstInitWorkName); err != nil { + if err := AssertWorkNotFound(initWorkAName); err != nil { return err } - return AssertWorkNotFound(secondInitWorkName) + return AssertWorkNotFound(initWorkBName) }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) watcherCancel() @@ -162,6 +166,180 @@ var _ = Describe("gRPC Source ManifestWork Client Test", func() { return nil }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) }) + + It("The watchers with label selector", func() { + watcherClient, err := grpcsource.NewMaestroGRPCSourceWorkClient( + watcherCtx, + apiClient, + grpcOptions, + sourceID, + ) + Expect(err).ShouldNot(HaveOccurred()) + + By("start watching with label app=test") + watcher, err := watcherClient.ManifestWorks(consumer_name).Watch(watcherCtx, metav1.ListOptions{ + LabelSelector: "app=test", + }) + Expect(err).ShouldNot(HaveOccurred()) + result := StartWatch(watcherCtx, watcher) + + By("create a work by work client") + workName := "work-" + rand.String(5) + work := NewManifestWorkWithLabels(workName, map[string]string{"app": "test"}) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + // wait for few seconds to ensure the creation is finished + <-time.After(5 * time.Second) + + By("delete the work by work client") + err = workClient.ManifestWorks(consumer_name).Delete(ctx, workName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + Eventually(func() error { + return AssertWatchResult(result) + }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + }) + }) + + Context("List works with gRPC source ManifestWork client", func() { + var workName string + var prodWorkName string + var testWorkAName string + var testWorkBName string + var testWorkCName string + + BeforeEach(func() { + // prepare works firstly + workName = "work-" + rand.String(5) + work := NewManifestWork(workName) + _, err := workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + prodWorkName = "work-prod" + rand.String(5) + work = NewManifestWorkWithLabels(prodWorkName, map[string]string{"app": "nginx", "env": "prod"}) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + testWorkAName = "work-test-a-" + rand.String(5) + work = NewManifestWorkWithLabels(testWorkAName, map[string]string{"app": "nginx", "env": "test", "val": "a"}) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + testWorkBName = "work-test-b-" + rand.String(5) + work = NewManifestWorkWithLabels(testWorkBName, map[string]string{"app": "nginx", "env": "test", "val": "b"}) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + testWorkCName = "work-test-c-" + rand.String(5) + work = NewManifestWorkWithLabels(testWorkCName, map[string]string{"app": "nginx", "env": "test", "val": "c"}) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + AfterEach(func() { + err := workClient.ManifestWorks(consumer_name).Delete(ctx, workName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = workClient.ManifestWorks(consumer_name).Delete(ctx, prodWorkName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = workClient.ManifestWorks(consumer_name).Delete(ctx, testWorkAName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = workClient.ManifestWorks(consumer_name).Delete(ctx, testWorkBName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = workClient.ManifestWorks(consumer_name).Delete(ctx, testWorkCName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + Eventually(func() error { + if err := AssertWorkNotFound(workName); err != nil { + return err + } + + if err := AssertWorkNotFound(prodWorkName); err != nil { + return err + } + + if err := AssertWorkNotFound(testWorkAName); err != nil { + return err + } + + if err := AssertWorkNotFound(testWorkBName); err != nil { + return err + } + + return AssertWorkNotFound(testWorkCName) + }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("List works with options", func() { + By("list all works") + works, err := workClient.ManifestWorks(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, workName, prodWorkName, testWorkAName, testWorkBName, testWorkCName)).ShouldNot(HaveOccurred()) + + By("list works by consumer name") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, workName, prodWorkName, testWorkAName, testWorkBName, testWorkCName)).ShouldNot(HaveOccurred()) + + By("list works by nonexistent consumer") + works, err = workClient.ManifestWorks("nonexistent").List(ctx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items)).ShouldNot(HaveOccurred()) + + By("list works with nonexistent labels") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + LabelSelector: "nonexistent=true", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items)).ShouldNot(HaveOccurred()) + + By("list works with app label") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + LabelSelector: "app=nginx", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, prodWorkName, testWorkAName, testWorkBName, testWorkCName)).ShouldNot(HaveOccurred()) + + By("list works without test env") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + LabelSelector: "app=nginx,env!=test", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, prodWorkName)).ShouldNot(HaveOccurred()) + + By("list works in prod and test env") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + LabelSelector: "env in (prod, test)", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, prodWorkName, testWorkAName, testWorkBName, testWorkCName)).ShouldNot(HaveOccurred()) + + By("list works in test env and val not in a and b") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + LabelSelector: "env=test,val notin (a,b)", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, testWorkCName)).ShouldNot(HaveOccurred()) + + By("list works with val label") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + LabelSelector: "val", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, testWorkAName, testWorkBName, testWorkCName)).ShouldNot(HaveOccurred()) + + // TODO support does not exist + // By("list works without val label") + // works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + // LabelSelector: "!val", + // }) + // Expect(err).ShouldNot(HaveOccurred()) + // Expect(AssertWorks(works.Items, workName, prodWorkName)).ShouldNot(HaveOccurred()) + }) }) }) @@ -206,11 +384,11 @@ func AssertWatchResult(result *WatchedResult) error { hasDeletedWork := false for _, watchedWork := range result.WatchedWorks { - if strings.HasPrefix(watchedWork.Name, "first-init-work-") { + if strings.HasPrefix(watchedWork.Name, "init-work-a") { hasFirstInitWork = true } - if strings.HasPrefix(watchedWork.Name, "second-init-work-") { + if strings.HasPrefix(watchedWork.Name, "init-work-b") { hasSecondInitWork = true } @@ -255,6 +433,25 @@ func AssertWorkNotFound(name string) error { return fmt.Errorf("the work %s still exists", name) } +func AssertWorks(works []workv1.ManifestWork, expected ...string) error { + workNames := sets.Set[string]{} + expectedNames := sets.Set[string]{}.Insert(expected...) + + for _, work := range works { + workNames.Insert(work.Name) + } + + if len(expectedNames) != len(workNames) { + return fmt.Errorf("expected %v, but got %v", expectedNames, workNames) + } + + if !equality.Semantic.DeepEqual(expectedNames, workNames) { + return fmt.Errorf("expected %v, but got %v", expectedNames, workNames) + } + + return nil +} + func NewManifestWork(name string) *workv1.ManifestWork { return &workv1.ManifestWork{ ObjectMeta: metav1.ObjectMeta{ @@ -270,6 +467,12 @@ func NewManifestWork(name string) *workv1.ManifestWork { } } +func NewManifestWorkWithLabels(name string, labels map[string]string) *workv1.ManifestWork { + work := NewManifestWork(name) + work.Labels = labels + return work +} + func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) { oldData, err := json.Marshal(old) if err != nil { @@ -297,6 +500,9 @@ func NewManifest(name string) workv1.Manifest { "metadata": map[string]interface{}{ "namespace": "default", "name": name, + "labels": map[string]string{ + "test": "true", + }, }, "data": map[string]string{ "test": rand.String(5),