Skip to content

Commit

Permalink
support listing works with its labels (#135)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey authored Jun 25, 2024
1 parent e63f40f commit d5654af
Show file tree
Hide file tree
Showing 8 changed files with 468 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
k8s.io/klog/v2 v2.120.1
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd
open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@ open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZ
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0=
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x25QZIzfZqeSFfZdNrzc2hlHm6t/JKYKu9fI=
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4=
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd h1:kTVZOR7bTdh4ID7EoliyGhPR5CItpx8GehN581IxoPA=
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2 h1:HOvmoJ1CZF26EDkf4t53ZYztaSjM1LBFk1JuHTIffHU=
open-cluster-management.io/sdk-go v0.14.1-0.20240624015756-723bc4d1e5b2/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk=
Expand Down
44 changes: 41 additions & 3 deletions pkg/api/resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
workv1 "open-cluster-management.io/api/work/v1"
cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
)

type ResourceType string
Expand Down Expand Up @@ -93,9 +94,35 @@ type ResourcePatchRequest struct{}

// JSONMAPToCloudEvent converts a JSONMap (resource manifest or status) to a CloudEvent
func JSONMAPToCloudEvent(res datatypes.JSONMap) (*cloudevents.Event, error) {
resJSON, err := res.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err)
var err error
var resJSON []byte

if metadata, ok := res[codec.ExtensionWorkMeta]; ok {
// cloudevents require its extension value as string, so we need convert the metadata object
// to string back

// ensure the original resource will be not changed
resCopy := datatypes.JSONMap{}
for key, value := range res {
resCopy[key] = value
}

metaJson, err := json.Marshal(metadata)
if err != nil {
return nil, err
}

resCopy[codec.ExtensionWorkMeta] = string(metaJson)

resJSON, err = resCopy.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err)
}
} else {
resJSON, err = res.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err)
}
}

evt := &cloudevents.Event{}
Expand All @@ -118,6 +145,17 @@ func CloudEventToJSONMap(evt *cloudevents.Event) (datatypes.JSONMap, error) {
return nil, fmt.Errorf("failed to unmarshal cloudevent JSON to JSONMAP: %v", err)
}

if metadata, ok := res[codec.ExtensionWorkMeta]; ok {
// cloudevents treat its extension value as string, so we need convert metadata extension
// to an object for supporting to query the resources with metadata
objectMeta := map[string]any{}

if err := json.Unmarshal([]byte(fmt.Sprintf("%s", metadata)), &objectMeta); err != nil {
return nil, err
}
res[codec.ExtensionWorkMeta] = objectMeta
}

return res, nil
}

Expand Down
84 changes: 84 additions & 0 deletions pkg/client/cloudevents/grpcsource/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@ package grpcsource

import (
"encoding/json"
"fmt"
"strings"

"github.com/openshift-online/maestro/pkg/api/openapi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
)

const jsonbPrefix = `payload->'metadata'->'labels'`

// ToManifestWork converts an openapi.ResourceBundle object to workv1.ManifestWork object
func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) {
work := &workv1.ManifestWork{}
Expand Down Expand Up @@ -86,6 +92,84 @@ func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) {
return work, nil
}

func ToLabelSearch(opts metav1.ListOptions) (labels.Selector, string, bool, error) {
if len(opts.LabelSelector) == 0 {
return labels.Everything(), "", false, nil
}

labelSelector, err := labels.Parse(opts.LabelSelector)
if err != nil {
return nil, "", false, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err)
}

requirements, selectable := labelSelector.Requirements()
if !selectable {
return labels.Everything(), "", false, nil
}

equalsLabels := []string{}
notEqualsLabels := []string{}

existsKeys := []string{}
doesNotExistKeys := []string{}

inLabels := map[string][]string{}

// refer to below links to find how to use the label selector in kubernetes
// https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#equality-based-requirement
// https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#set-based-requirement
for _, requirement := range requirements {
switch requirement.Operator() {
case selection.Equals, selection.DoubleEquals:
values := requirement.Values()
if len(values) != 1 {
return nil, "", false, fmt.Errorf("too many values in equals operation")
}

equalsLabels = append(equalsLabels, fmt.Sprintf(`"%s":"%s"`, requirement.Key(), values.List()[0]))
case selection.NotEquals:
values := requirement.Values()
if len(values) != 1 {
return nil, "", false, fmt.Errorf("too many values in not equals operation")
}

notEqualsLabels = append(notEqualsLabels, fmt.Sprintf(`%s->>'%s'<>'%s'`, jsonbPrefix, requirement.Key(), values.List()[0]))
case selection.Exists:
existsKeys = append(existsKeys, fmt.Sprintf(`%s->>'%s'<>null`, jsonbPrefix, requirement.Key()))
case selection.In:
vals := []string{}
for val := range requirement.Values() {
vals = append(vals, fmt.Sprintf("'%s'", val))
}

inLabels[requirement.Key()] = vals
case selection.NotIn:
for val := range requirement.Values() {
notEqualsLabels = append(notEqualsLabels, fmt.Sprintf(`%s->>'%s'<>'%s'`, jsonbPrefix, requirement.Key(), val))
}
default:
// only DoesNotExist cannot be supported
return nil, "", false, fmt.Errorf("unsupported operator %s", requirement.Operator())
}
}

labelSearch := []string{}
if len(equalsLabels) != 0 {
labelSearch = append(labelSearch, fmt.Sprintf(`%s@>'{%s}'`, jsonbPrefix, strings.Join(equalsLabels, ",")))
}

if len(inLabels) != 0 {
for key, vals := range inLabels {
labelSearch = append(labelSearch, fmt.Sprintf(`%s->>'%s'in(%s)`, jsonbPrefix, key, strings.Join(vals, ",")))
}
}

labelSearch = append(labelSearch, notEqualsLabels...)
labelSearch = append(labelSearch, existsKeys...)
labelSearch = append(labelSearch, doesNotExistKeys...)
return labelSelector, strings.Join(labelSearch, " and "), true, nil
}

func marshal(obj map[string]any) ([]byte, error) {
unstructuredObj := unstructured.Unstructured{Object: obj}
data, err := unstructuredObj.MarshalJSON()
Expand Down
76 changes: 76 additions & 0 deletions pkg/client/cloudevents/grpcsource/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/openshift-online/maestro/pkg/api/openapi"
"k8s.io/apimachinery/pkg/api/equality"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"

workv1 "open-cluster-management.io/api/work/v1"
Expand Down Expand Up @@ -130,3 +131,78 @@ func TestToManifestWork(t *testing.T) {
})
}
}

func TestToLabelSearch(t *testing.T) {
cases := []struct {
name string
opts v1.ListOptions
expectedSelectable bool
expectedLabelSearch string
}{
{
name: "no label selector",
opts: v1.ListOptions{},
expectedSelectable: false,
expectedLabelSearch: "",
},
{
name: "selector everything",
opts: v1.ListOptions{LabelSelector: labels.Everything().String()},
expectedSelectable: false,
expectedLabelSearch: "",
},
{
name: "one equals selector",
opts: v1.ListOptions{LabelSelector: "a=b"},
expectedSelectable: true,
expectedLabelSearch: `payload->'metadata'->'labels'@>'{"a":"b"}'`,
},
{
name: "multiple equals selector",
opts: v1.ListOptions{LabelSelector: "a=b,c==d"},
expectedSelectable: true,
expectedLabelSearch: `payload->'metadata'->'labels'@>'{"a":"b","c":"d"}'`,
},
{
name: "multiple not equals selector",
opts: v1.ListOptions{LabelSelector: "a!=b,c!=d"},
expectedSelectable: true,
expectedLabelSearch: `payload->'metadata'->'labels'->>'a'<>'b' and payload->'metadata'->'labels'->>'c'<>'d'`,
},
{
name: "exist selector",
opts: v1.ListOptions{LabelSelector: "a"},
expectedSelectable: true,
expectedLabelSearch: `payload->'metadata'->'labels'->>'a'<>null`,
},
{
name: "in selector",
opts: v1.ListOptions{LabelSelector: "env in (a)"},
expectedSelectable: true,
expectedLabelSearch: `payload->'metadata'->'labels'->>'env'in('a')`,
},
{
name: "not in selector",
opts: v1.ListOptions{LabelSelector: "env notin (a)"},
expectedSelectable: true,
expectedLabelSearch: `payload->'metadata'->'labels'->>'env'<>'a'`,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
_, labelSearch, selectable, err := ToLabelSearch(c.opts)
if c.expectedSelectable != selectable {
t.Errorf("expected %v, but got %v", c.expectedSelectable, selectable)
}

if c.expectedLabelSearch != labelSearch {
t.Errorf("expected %s, but got %s", c.expectedLabelSearch, labelSearch)
}

if err != nil {
t.Errorf("expected error %v", err)
}
})
}
}
18 changes: 13 additions & 5 deletions pkg/client/cloudevents/grpcsource/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
workv1 "open-cluster-management.io/api/work/v1"
Expand All @@ -17,16 +18,18 @@ type workWatcher struct {
done chan struct{}
stopped bool

namespace string
namespace string
labelSelector labels.Selector
}

var _ watch.Interface = &workWatcher{}

func newWorkWatcher(namespace string) *workWatcher {
func newWorkWatcher(namespace string, labelSelector labels.Selector) *workWatcher {
return &workWatcher{
result: make(chan watch.Event),
done: make(chan struct{}),
namespace: namespace,
result: make(chan watch.Event),
done: make(chan struct{}),
namespace: namespace,
labelSelector: labelSelector,
}
}

Expand Down Expand Up @@ -68,6 +71,11 @@ func (w *workWatcher) Receive(evt watch.Event) {
return
}

if !w.labelSelector.Matches(labels.Set(work.GetLabels())) {
klog.V(4).Infof("ignore the label unmatched work %s/%s for the watcher %s", work.Namespace, work.Name, w.namespace)
return
}

w.result <- evt
}

Expand Down
Loading

0 comments on commit d5654af

Please sign in to comment.