diff --git a/pkg/flink/resources.go b/pkg/flink/resources.go index f708bd7..7c97d28 100644 --- a/pkg/flink/resources.go +++ b/pkg/flink/resources.go @@ -70,6 +70,15 @@ func (gcsDownloader) Container(artifacts []string) corev1.Container { type DownloaderRegistry map[string]Downloader +func (dr DownloaderRegistry) GetDownloader(scheme string) (Downloader, error) { + d, ok := dr[scheme] + if !ok { + return nil, fmt.Errorf("downloader not implemented for scheme: %s", scheme) + } + + return d, nil +} + var downloaderRegistry = DownloaderRegistry{ "": localDownloader{}, "gs": gcsDownloader{}, @@ -219,12 +228,15 @@ func (fc *FlinkCluster) updateJobSpec(taskCtx FlinkTaskContext) error { volumeName := fmt.Sprintf("%s-jars", taskCtx.ClusterName.String()) out.Volumes = append(out.Volumes, corev1.Volume{Name: volumeName}) out.VolumeMounts = append(out.VolumeMounts, corev1.VolumeMount{Name: volumeName, MountPath: jarsVolumePath}) + out.JarFile = defaultJarFile for scheme, urls := range groupBy { - out.JarFile = defaultJarFile - out.InitContainers = append(out.InitContainers, downloaderRegistry[scheme].Container(urls)) + downloader, err := downloaderRegistry.GetDownloader(scheme) + if err != nil { + return err + } + out.InitContainers = append(out.InitContainers, downloader.Container(urls)) } - out.InitContainers = append(out.InitContainers, artifactZip) return nil @@ -263,7 +275,9 @@ func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCl cluster.updateJobManagerSpec(taskCtx) cluster.updateTaskManagerSpec(taskCtx) - cluster.updateJobSpec(taskCtx) + if err := cluster.updateJobSpec(taskCtx); err != nil { + return nil, err + } // fill in defaults resource := flinkOp.FlinkCluster(cluster) diff --git a/pkg/flink/resources_test.go b/pkg/flink/resources_test.go index a75298d..82523c7 100644 --- a/pkg/flink/resources_test.go +++ b/pkg/flink/resources_test.go @@ -281,3 +281,21 @@ func TestBuildFlinkClusterSpecJobCommand(t *testing.T) { } assert.Assert(t, reflect.DeepEqual(initCont.Args, args)) } + +func TestBuildFlinkClusterWithUnsupportedSchemeJar(t *testing.T) { + job := flinkIdl.FlinkJob{ + JarFiles: []string{"http://bucket.com/artifact0.jar"}, + } + config := GetFlinkConfig() + + flinkCtx := FlinkTaskContext{ + ClusterName: ClusterName("generated-name"), + Namespace: "test-namespace", + Annotations: make(map[string]string), + Labels: make(map[string]string), + Job: job, + } + + _, err := NewFlinkCluster(config, flinkCtx) + assert.Error(t, err, "downloader not implemented for scheme: http") +}