Skip to content

Commit

Permalink
feat(backend): authorize readArtifacts and ReportMetrics endpoints (k…
Browse files Browse the repository at this point in the history
…ubeflow#7819)

* Authorize readArtifacts and ReportMetrics endpoints

New Verbs (reportMetrics and readArtifact) are added to ClusterRole with name: aggregate-to-kubeflow-pipelines-edit

Signed-off-by: Diana Atanasova <[email protected]>

* Add authorization when Persistent Agent communicate with the api-server

Persistent Agent authorize itself based ot the namespace and the current user

Fixes: kubeflow#7818

* Update persistence_agent.csv license file

Signed-off-by: Diana Atanasova <[email protected]>

* Fix lexical error in persistent agent cluster role

Signed-off-by: Diana Atanasova <[email protected]>

* Fix integration tests/Fix MULTIUSER= false usecase

Cover MULTIUSER=false usecase/Standalone pipeline installation.
In this case the namespace doesn't have `user` annotation and
there is no need to provide `kubeflow-userid` Header when making
a request against kfp-api-server

Signed-off-by: Diana Atanasova <[email protected]>

* rebase: fix conflixt in license file

Signed-off-by: Diana Atanasova <[email protected]>

* rebase add new line in the end of licensing file

Signed-off-by: Diana Atanasova <[email protected]>
  • Loading branch information
difince authored and jlyaoyuli committed Jan 5, 2023
1 parent 262cbc9 commit a50fc1d
Show file tree
Hide file tree
Showing 24 changed files with 695 additions and 84 deletions.
85 changes: 85 additions & 0 deletions backend/src/agent/persistence/client/fake_namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package client

import (
"context"
"errors"
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
corev1 "k8s.io/client-go/applyconfigurations/core/v1"
)

type FakeNamespaceClient struct {
namespace string
user string
}

func (f *FakeNamespaceClient) SetReturnValues(namespace string, user string) {
f.namespace = namespace
f.user = user
}

func (f FakeNamespaceClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Namespace, error) {
if f.namespace == name && len(f.user) != 0 {
ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{
Namespace: f.namespace,
Annotations: map[string]string{
"owner": f.user,
},
}}
return &ns, nil
}
return nil, errors.New("failed to get namespace")
}

func (f FakeNamespaceClient) Create(ctx context.Context, namespace *v1.Namespace, opts metav1.CreateOptions) (*v1.Namespace, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (f FakeNamespaceClient) Update(ctx context.Context, namespace *v1.Namespace, opts metav1.UpdateOptions) (*v1.Namespace, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (f FakeNamespaceClient) UpdateStatus(ctx context.Context, namespace *v1.Namespace, opts metav1.UpdateOptions) (*v1.Namespace, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (f FakeNamespaceClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
glog.Error("This fake method is not yet implemented.")
return nil
}

func (f FakeNamespaceClient) List(ctx context.Context, opts metav1.ListOptions) (*v1.NamespaceList, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (f FakeNamespaceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (f FakeNamespaceClient) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Namespace, err error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (f FakeNamespaceClient) Apply(ctx context.Context, namespace *corev1.NamespaceApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Namespace, err error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (f FakeNamespaceClient) ApplyStatus(ctx context.Context, namespace *corev1.NamespaceApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Namespace, err error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (f FakeNamespaceClient) Finalize(ctx context.Context, item *v1.Namespace, opts metav1.UpdateOptions) (*v1.Namespace, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}
87 changes: 87 additions & 0 deletions backend/src/agent/persistence/client/kubernetes_core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package client

import (
"context"
"fmt"
"os"
"time"

"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"

"github.com/kubeflow/pipelines/backend/src/common/util"
)

type KubernetesCoreInterface interface {
NamespaceClient() v1.NamespaceInterface
GetNamespaceOwner(namespace string) (string, error)
}

type KubernetesCore struct {
coreV1Client v1.CoreV1Interface
}

func (c *KubernetesCore) NamespaceClient() v1.NamespaceInterface {
return c.coreV1Client.Namespaces()
}

func (c *KubernetesCore) GetNamespaceOwner(namespace string) (string, error) {
if os.Getenv("MULTIUSER") == "" || os.Getenv("MULTIUSER") == "false" {
return "", nil
}
ns, err := c.NamespaceClient().Get(context.Background(), namespace, metav1.GetOptions{})
if err != nil {
return "", errors.Wrapf(err, fmt.Sprintf("failed to get namespace '%v'", namespace))
}
owner, ok := ns.Annotations["owner"]
if !ok {
return "", errors.New(fmt.Sprintf("namespace '%v' has no owner in the annotations", namespace))
}
return owner, nil
}

func createKubernetesCore(clientParams util.ClientParameters) (KubernetesCoreInterface, error) {
clientSet, err := getKubernetesClientset(clientParams)
if err != nil {
return nil, err
}
return &KubernetesCore{clientSet.CoreV1()}, nil
}

// CreateKubernetesCoreOrFatal creates a new client for the Kubernetes pod.
func CreateKubernetesCoreOrFatal(initConnectionTimeout time.Duration, clientParams util.ClientParameters) KubernetesCoreInterface {
var client KubernetesCoreInterface
var err error
var operation = func() error {
client, err = createKubernetesCore(clientParams)
return err
}
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = initConnectionTimeout
err = backoff.Retry(operation, b)

if err != nil {
glog.Fatalf("Failed to create namespace client. Error: %v", err)
}
return client
}

func getKubernetesClientset(clientParams util.ClientParameters) (*kubernetes.Clientset, error) {
restConfig, err := rest.InClusterConfig()
if err != nil {
return nil, errors.Wrap(err, "Failed to initialize kubernetes client.")
}
restConfig.QPS = float32(clientParams.QPS)
restConfig.Burst = clientParams.Burst

clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, errors.Wrap(err, "Failed to initialize kubernetes client set.")
}
return clientSet, nil
}
37 changes: 37 additions & 0 deletions backend/src/agent/persistence/client/kubernetes_core_fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package client

import (
"context"
"errors"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

type KubernetesCoreFake struct {
coreV1ClientFake *FakeNamespaceClient
}

func (c *KubernetesCoreFake) NamespaceClient() v1.NamespaceInterface {
return c.coreV1ClientFake
}

func (c *KubernetesCoreFake) GetNamespaceOwner(namespace string) (string, error) {
ns, err := c.NamespaceClient().Get(context.Background(), namespace, metav1.GetOptions{})
if err != nil {
return "", err
}
owner, ok := ns.Annotations["owner"]
if !ok {
return "", errors.New(fmt.Sprintf("namespace '%v' has no owner in the annotations", namespace))
}
return owner, nil
}

func NewKubernetesCoreFake() *KubernetesCoreFake {
return &KubernetesCoreFake{&FakeNamespaceClient{}}
}
func (c *KubernetesCoreFake) Set(namespaceToReturn string, userToReturn string) {
c.coreV1ClientFake.SetReturnValues(namespaceToReturn, userToReturn)
}
41 changes: 35 additions & 6 deletions backend/src/agent/persistence/client/pipeline_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package client
import (
"context"
"fmt"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"google.golang.org/grpc/metadata"
"os"
"time"

api "github.com/kubeflow/pipelines/backend/api/go_client"
Expand All @@ -33,8 +36,8 @@ const (
type PipelineClientInterface interface {
ReportWorkflow(workflow *util.Workflow) error
ReportScheduledWorkflow(swf *util.ScheduledWorkflow) error
ReadArtifact(request *api.ReadArtifactRequest) (*api.ReadArtifactResponse, error)
ReportRunMetrics(request *api.ReportRunMetricsRequest) (*api.ReportRunMetricsResponse, error)
ReadArtifact(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error)
ReportRunMetrics(request *api.ReportRunMetricsRequest, user string) (*api.ReportRunMetricsResponse, error)
}

type PipelineClient struct {
Expand Down Expand Up @@ -139,8 +142,13 @@ func (p *PipelineClient) ReportScheduledWorkflow(swf *util.ScheduledWorkflow) er

// ReadArtifact reads artifact content from run service. If the artifact is not present, returns
// nil response.
func (p *PipelineClient) ReadArtifact(request *api.ReadArtifactRequest) (*api.ReadArtifactResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
func (p *PipelineClient) ReadArtifact(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error) {
pctx := context.Background()
if user != "" {
pctx = metadata.AppendToOutgoingContext(pctx, getKubeflowUserIDHeader(),
getKubeflowUserIDPrefix()+user)
}
ctx, cancel := context.WithTimeout(pctx, time.Minute)
defer cancel()

response, err := p.runServiceClient.ReadArtifact(ctx, request)
Expand All @@ -153,8 +161,13 @@ func (p *PipelineClient) ReadArtifact(request *api.ReadArtifactRequest) (*api.Re
}

// ReportRunMetrics reports run metrics to run service.
func (p *PipelineClient) ReportRunMetrics(request *api.ReportRunMetricsRequest) (*api.ReportRunMetricsResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
func (p *PipelineClient) ReportRunMetrics(request *api.ReportRunMetricsRequest, user string) (*api.ReportRunMetricsResponse, error) {
pctx := context.Background()
if user != "" {
pctx = metadata.AppendToOutgoingContext(pctx, getKubeflowUserIDHeader(),
getKubeflowUserIDPrefix()+user)
}
ctx, cancel := context.WithTimeout(pctx, time.Minute)
defer cancel()

response, err := p.runServiceClient.ReportRunMetrics(ctx, request)
Expand All @@ -166,3 +179,19 @@ func (p *PipelineClient) ReportRunMetrics(request *api.ReportRunMetricsRequest)
}
return response, nil
}

//TODO use config file & viper and "github.com/kubeflow/pipelines/backend/src/apiserver/common.GetKubeflowUserIDHeader()"
func getKubeflowUserIDHeader() string {
if value, ok := os.LookupEnv(common.KubeflowUserIDHeader); ok {
return value
}
return common.GoogleIAPUserIdentityHeader
}

//TODO use of viper & viper and "github.com/kubeflow/pipelines/backend/src/apiserver/common.GetKubeflowUserIDPrefix()"
func getKubeflowUserIDPrefix() string {
if value, ok := os.LookupEnv(common.KubeflowUserIDPrefix); ok {
return value
}
return common.GoogleIAPUserIdentityPrefix
}
7 changes: 5 additions & 2 deletions backend/src/agent/persistence/client/pipeline_client_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@ func (p *PipelineClientFake) ReportScheduledWorkflow(swf *util.ScheduledWorkflow
return nil
}

func (p *PipelineClientFake) ReadArtifact(request *api.ReadArtifactRequest) (*api.ReadArtifactResponse, error) {
func (p *PipelineClientFake) ReadArtifact(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error) {
if p.err != nil {
return nil, p.err
}
p.readArtifactRequest = request
return p.artifacts[request.String()], nil
}

func (p *PipelineClientFake) ReportRunMetrics(request *api.ReportRunMetricsRequest) (*api.ReportRunMetricsResponse, error) {
func (p *PipelineClientFake) ReportRunMetrics(request *api.ReportRunMetricsRequest, user string) (*api.ReportRunMetricsResponse, error) {
p.reportedMetricsRequest = request
return p.reportMetricsResponseStub, p.reportMetricsErrorStub
}
Expand Down
9 changes: 9 additions & 0 deletions backend/src/agent/persistence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ const (
clientBurstFlagName = "clientBurst"
)

const (
DefaultConnectionTimeout = 6 * time.Minute
)

func main() {
flag.Parse()

Expand Down Expand Up @@ -95,6 +99,10 @@ func main() {
swfInformerFactory = swfinformers.NewFilteredSharedInformerFactory(swfClient, time.Second*30, namespace, nil)
workflowInformerFactory = workflowinformers.NewFilteredSharedInformerFactory(workflowClient, time.Second*30, namespace, nil)
}
k8sCoreClient := client.CreateKubernetesCoreOrFatal(DefaultConnectionTimeout, util.ClientParameters{
QPS: clientQPS,
Burst: clientBurst,
})

pipelineClient, err := client.NewPipelineClient(
initializeTimeout,
Expand All @@ -111,6 +119,7 @@ func main() {
swfInformerFactory,
workflowInformerFactory,
pipelineClient,
k8sCoreClient,
util.NewRealTime())

go swfInformerFactory.Start(stopCh)
Expand Down
3 changes: 2 additions & 1 deletion backend/src/agent/persistence/persistence_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewPersistenceAgent(
swfInformerFactory swfinformers.SharedInformerFactory,
workflowInformerFactory workflowinformers.SharedInformerFactory,
pipelineClient *client.PipelineClient,
k8sCoreClient client.KubernetesCoreInterface,
time util.TimeInterface) *PersistenceAgent {
// obtain references to shared informers
swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows()
Expand All @@ -64,7 +65,7 @@ func NewPersistenceAgent(

workflowWorker := worker.NewPersistenceWorker(time, workflowregister.WorkflowKind,
workflowInformer.Informer(), true,
worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish))
worker.NewWorkflowSaver(workflowClient, pipelineClient, k8sCoreClient, ttlSecondsAfterWorkflowFinish))

agent := &PersistenceAgent{
swfClient: swfClient,
Expand Down
Loading

0 comments on commit a50fc1d

Please sign in to comment.