Skip to content

Commit

Permalink
support listing works with its labels
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jun 20, 2024
1 parent f9be517 commit de64f87
Show file tree
Hide file tree
Showing 19 changed files with 744 additions and 650 deletions.
4 changes: 2 additions & 2 deletions cmd/maestro/agent/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewAgentCommand() *cobra.Command {
agentOption.MaxJSONRawLength = maxJSONRawLength
agentOption.CloudEventsClientCodecs = []string{"manifest", "manifestbundle"}
cfg := spoke.NewWorkAgentConfig(commonOptions, agentOption)
cmdConfig := commonOptions.CommoOpts.
cmdConfig := commonOptions.CommonOpts.
NewControllerCommandConfig("maestro-agent", version.Get(), cfg.RunWorkloadAgent)

cmd := cmdConfig.NewCommandWithContext(context.TODO())
Expand Down Expand Up @@ -68,6 +68,6 @@ func NewAgentCommand() *cobra.Command {
func addFlags(fs *pflag.FlagSet) {
fs.StringVar(&commonOptions.SpokeClusterName, "consumer-name",
commonOptions.SpokeClusterName, "Name of the consumer")
fs.BoolVar(&commonOptions.CommoOpts.CmdConfig.DisableLeaderElection, "disable-leader-election",
fs.BoolVar(&commonOptions.CommonOpts.CmdConfig.DisableLeaderElection, "disable-leader-election",
true, "Disable leader election.")
}
2 changes: 1 addition & 1 deletion examples/manifestworkclient/client-a/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func main() {
WithClientID(fmt.Sprintf("%s-client-a", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)).
WithWorkClientWatcherStore(grpcsource.NewRESTFulAPIWatcherStore(ctx, maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/manifestworkclient/client-b/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {
WithClientID(fmt.Sprintf("%s-client-b", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)).
WithWorkClientWatcherStore(grpcsource.NewRESTFulAPIWatcherStore(ctx, maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ require (
github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
github.com/openshift-online/ocm-sdk-go v0.1.334
github.com/openshift-online/ocm-common v0.0.0-20240611095658-2ad1e6fae94d
github.com/openshift-online/ocm-sdk-go v0.1.421
github.com/prometheus/client_golang v1.18.0
github.com/segmentio/ksuid v1.0.2
github.com/spf13/cobra v1.8.0
Expand All @@ -44,8 +45,8 @@ require (
k8s.io/component-base v0.29.3
k8s.io/klog/v2 v2.120.1
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd
)

require (
Expand Down Expand Up @@ -115,6 +116,7 @@ require (
github.com/robfig/cron v1.2.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 // indirect
github.com/smartystreets/goconvey v1.8.1 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
Expand All @@ -131,13 +133,13 @@ require (
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.17.0 // indirect
Expand All @@ -163,3 +165,5 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/openshift-online/ocm-common => github.com/ziccardi/ocm-common v0.0.0-20240619140356-177492f621b3
403 changes: 24 additions & 379 deletions go.sum

Large diffs are not rendered by default.

44 changes: 41 additions & 3 deletions pkg/api/resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
workv1 "open-cluster-management.io/api/work/v1"
cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
)

type ResourceType string
Expand Down Expand Up @@ -93,9 +94,35 @@ type ResourcePatchRequest struct{}

// JSONMAPToCloudEvent converts a JSONMap (resource manifest or status) to a CloudEvent
func JSONMAPToCloudEvent(res datatypes.JSONMap) (*cloudevents.Event, error) {
resJSON, err := res.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err)
var err error
var resJSON []byte

if metadata, ok := res[codec.ExtensionWorkMeta]; ok {
// cloudevents require its extension value as string, so we need convert the metadata object
// to string back

// ensure the original resource will be not changed
resCopy := datatypes.JSONMap{}
for key, value := range res {
resCopy[key] = value
}

metaJson, err := json.Marshal(metadata)
if err != nil {
return nil, err
}

resCopy[codec.ExtensionWorkMeta] = string(metaJson)

resJSON, err = resCopy.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err)
}
} else {
resJSON, err = res.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err)
}
}

evt := &cloudevents.Event{}
Expand All @@ -118,6 +145,17 @@ func CloudEventToJSONMap(evt *cloudevents.Event) (datatypes.JSONMap, error) {
return nil, fmt.Errorf("failed to unmarshal cloudevent JSON to JSONMAP: %v", err)
}

if metadata, ok := res[codec.ExtensionWorkMeta]; ok {
// cloudevents treat its extension value as string, so we need convert metadata extension
// to an object for supporting to query the resources with metadata
objectMeta := map[string]any{}

if err := json.Unmarshal([]byte(fmt.Sprintf("%s", metadata)), &objectMeta); err != nil {
return nil, err
}
res[codec.ExtensionWorkMeta] = objectMeta
}

return res, nil
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/api/resource_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,55 @@ import (
"encoding/json"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/types"
"gorm.io/datatypes"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
)

func TestConvertCloudEventAndJSONMap(t *testing.T) {
expectedMeta := "{\"name\":\"work\",\"namespace\":\"bfb16f22\",\"uid\":\"4b5a1342\",\"resourceVersion\":\"0\",\"creationTimestamp\":null}"

evt := cloudevents.NewEvent()
evt.SetSource("source1")
evt.SetType("io.open-cluster-management.works.v1alpha1.manifestbundles.status.test")
evt.SetExtension("resourceid", "test")
evt.SetExtension("resourceversion", "13")
evt.SetExtension("metadata", expectedMeta)
if err := evt.SetData(cloudevents.ApplicationJSON, &payload.ManifestBundleStatus{
Conditions: []metav1.Condition{
{
Type: "Test",
Status: metav1.ConditionTrue,
},
},
}); err != nil {
t.Fatal(err)
}

jsonMap, err := CloudEventToJSONMap(&evt)
if err != nil {
t.Fatal(err)
}

convertedEvt, err := JSONMAPToCloudEvent(jsonMap)
if err != nil {
t.Fatal(err)
}

metadataExt := convertedEvt.Extensions()["metadata"]
metadata, err := types.ToString(metadataExt)
if err != nil {
t.Fatal(err)
}

if metadata != expectedMeta {
t.Errorf("expected %s, but got %s", expectedMeta, metadata)
}
}

func TestEncodeManifest(t *testing.T) {
cases := []struct {
name string
Expand Down
37 changes: 37 additions & 0 deletions pkg/client/cloudevents/grpcsource/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package grpcsource

import (
"encoding/json"
"fmt"
"strings"

"github.com/openshift-online/maestro/pkg/api/openapi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
)
Expand Down Expand Up @@ -86,6 +90,39 @@ func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) {
return work, nil
}

func ToLabelSearch(opts metav1.ListOptions) (string, bool, error) {
if len(opts.LabelSelector) == 0 {
return "", false, nil
}

labelSelector, err := labels.Parse(opts.LabelSelector)
if err != nil {
return "", false, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err)
}

requirements, selectable := labelSelector.Requirements()
if !selectable {
return "", false, nil
}

labels := []string{}
for _, requirement := range requirements {
if requirement.Operator() != selection.Equals {
// TODO support more operators
return "", false, fmt.Errorf("unsupported operator %s", requirement.Operator())
}

values := requirement.Values()
if len(values) != 1 {
return "", false, fmt.Errorf("")
}

labels = append(labels, fmt.Sprintf(`"%s":"%s"`, requirement.Key(), values.List()[0]))
}

return fmt.Sprintf(`payload -> 'metadata' @> '{"labels":{%s}}'`, strings.Join(labels, ",")), true, nil
}

func marshal(obj map[string]any) ([]byte, error) {
unstructuredObj := unstructured.Unstructured{Object: obj}
data, err := unstructuredObj.MarshalJSON()
Expand Down
65 changes: 65 additions & 0 deletions pkg/client/cloudevents/grpcsource/util_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package grpcsource

import (
"fmt"
"testing"

"github.com/openshift-online/maestro/pkg/api/openapi"
"k8s.io/apimachinery/pkg/api/equality"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"

workv1 "open-cluster-management.io/api/work/v1"
Expand Down Expand Up @@ -130,3 +132,66 @@ func TestToManifestWork(t *testing.T) {
})
}
}

func TestToLabelSearch(t *testing.T) {
cases := []struct {
name string
opts v1.ListOptions
expectedSelectable bool
expectedLabelSearch string
expectedErr error
}{
{
name: "no label selector",
opts: v1.ListOptions{},
expectedSelectable: false,
expectedLabelSearch: "",
expectedErr: nil,
},
{
name: "selector everything",
opts: v1.ListOptions{LabelSelector: labels.Everything().String()},
expectedSelectable: false,
expectedLabelSearch: "",
expectedErr: nil,
},
{
name: "unsupported selector operator",
opts: v1.ListOptions{LabelSelector: "a"},
expectedSelectable: false,
expectedLabelSearch: "",
expectedErr: fmt.Errorf("unsupported operator exists"),
},
{
name: "one selector",
opts: v1.ListOptions{LabelSelector: "a=b"},
expectedSelectable: true,
expectedLabelSearch: "payload -> 'data' -> 'manifests' @> '[{\"metadata\":{\"labels\":{\"a\":\"b\"}}}]'",
expectedErr: nil,
},
{
name: "multiple selector",
opts: v1.ListOptions{LabelSelector: "a=b,c=d"},
expectedSelectable: true,
expectedLabelSearch: "payload -> 'data' -> 'manifests' @> '[{\"metadata\":{\"labels\":{\"a\":\"b\",\"c\":\"d\"}}}]'",
expectedErr: nil,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
labelSearch, selectable, err := ToLabelSearch(c.opts)
if c.expectedSelectable != selectable {
t.Errorf("expected %v, but got %v", c.expectedSelectable, selectable)
}

if c.expectedLabelSearch != labelSearch {
t.Errorf("expected %s, but got %s", c.expectedLabelSearch, labelSearch)
}

if err != nil && err.Error() != c.expectedErr.Error() {
t.Errorf("expected %v, but got %v", c.expectedErr, err)
}
})
}
}
Loading

0 comments on commit de64f87

Please sign in to comment.