Skip to content

Commit

Permalink
Add job artifacts as a general available env var for JM and TM (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Mar 21, 2022
1 parent f743b25 commit e3b082d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
16 changes: 11 additions & 5 deletions pkg/flink/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ package flink

import (
"bytes"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"net/url"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"
"text/template"

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"sigs.k8s.io/controller-runtime/pkg/client"

corev1 "k8s.io/api/core/v1"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils"
Expand All @@ -34,8 +35,9 @@ import (
)

var (
containerTmpl = template.New("container-template").Funcs(template.FuncMap{"join": strings.Join})
flinkPropertiesTmpl = template.New("flink-properties-template").Funcs(template.FuncMap{"join": strings.Join})
containerTmpl = template.New("container-template").Funcs(template.FuncMap{"join": strings.Join})
flinkPropertiesTmpl = template.New("flink-properties-template").Funcs(template.FuncMap{"join": strings.Join})
stagedJarsEnvVarName = "STAGED_JARS"
)

type ContainerTemplateData struct {
Expand Down Expand Up @@ -298,6 +300,10 @@ func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCl
Kind: KindFlinkCluster,
APIVersion: flinkOp.GroupVersion.String(),
}
cluster.Spec.EnvVars = []corev1.EnvVar{{
Name: stagedJarsEnvVarName,
Value: strings.Join(taskCtx.Job.JarFiles, " "),
}}

cluster.updateFlinkProperties(config, taskCtx)

Expand Down
15 changes: 12 additions & 3 deletions pkg/flink/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package flink

import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"reflect"
"strings"
"testing"

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"

flinkOp "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
flinkIdl "github.com/spotify/flyte-flink-plugin/gen/pb-go/flyteidl-flink"
"gotest.tools/assert"
Expand Down Expand Up @@ -79,6 +81,13 @@ func TestBuildFlinkClusterSpecValid(t *testing.T) {

assert.Equal(t, string(job.CleanupPolicy.AfterJobSucceeds), flinkOp.CleanupActionDeleteCluster)
assert.Equal(t, cluster.Spec.FlinkProperties["metrics.reporter.promgateway.groupingKey"], "namespace=test-namespace;cluster=generated-name;execution_id=1")

assert.Assert(t, len(cluster.Spec.EnvVars) == 1, "EnvVars should contain only 1 env")
expectedEnvVars := []corev1.EnvVar{{
Name: stagedJarsEnvVarName,
Value: strings.Join(jobIdl.JarFiles, " "),
}}
assert.Equal(t, cluster.Spec.EnvVars[0], expectedEnvVars[0])
}

func TestWithPersistentVolume(t *testing.T) {
Expand Down Expand Up @@ -294,7 +303,7 @@ func TestBuildAnnotationPatch(t *testing.T) {
assert.DeepEqual(
t,
jsonData["metadata"].(map[string]interface{})["annotations"].(map[string]interface{}),
map[string]interface{} {"testKey": "testValue"},
map[string]interface{}{"testKey": "testValue"},
)

assert.NilError(t, err)
Expand Down

0 comments on commit e3b082d

Please sign in to comment.