Skip to content

Commit

Permalink
Handle downloader not implemented (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored May 12, 2021
1 parent c6d8ff7 commit 00ae1af
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
22 changes: 18 additions & 4 deletions pkg/flink/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ func (gcsDownloader) Container(artifacts []string) corev1.Container {

type DownloaderRegistry map[string]Downloader

func (dr DownloaderRegistry) GetDownloader(scheme string) (Downloader, error) {
d, ok := dr[scheme]
if !ok {
return nil, fmt.Errorf("downloader not implemented for scheme: %s", scheme)
}

return d, nil
}

var downloaderRegistry = DownloaderRegistry{
"": localDownloader{},
"gs": gcsDownloader{},
Expand Down Expand Up @@ -219,12 +228,15 @@ func (fc *FlinkCluster) updateJobSpec(taskCtx FlinkTaskContext) error {
volumeName := fmt.Sprintf("%s-jars", taskCtx.ClusterName.String())
out.Volumes = append(out.Volumes, corev1.Volume{Name: volumeName})
out.VolumeMounts = append(out.VolumeMounts, corev1.VolumeMount{Name: volumeName, MountPath: jarsVolumePath})
out.JarFile = defaultJarFile

for scheme, urls := range groupBy {
out.JarFile = defaultJarFile
out.InitContainers = append(out.InitContainers, downloaderRegistry[scheme].Container(urls))
downloader, err := downloaderRegistry.GetDownloader(scheme)
if err != nil {
return err
}
out.InitContainers = append(out.InitContainers, downloader.Container(urls))
}

out.InitContainers = append(out.InitContainers, artifactZip)

return nil
Expand Down Expand Up @@ -263,7 +275,9 @@ func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCl

cluster.updateJobManagerSpec(taskCtx)
cluster.updateTaskManagerSpec(taskCtx)
cluster.updateJobSpec(taskCtx)
if err := cluster.updateJobSpec(taskCtx); err != nil {
return nil, err
}

// fill in defaults
resource := flinkOp.FlinkCluster(cluster)
Expand Down
18 changes: 18 additions & 0 deletions pkg/flink/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,21 @@ func TestBuildFlinkClusterSpecJobCommand(t *testing.T) {
}
assert.Assert(t, reflect.DeepEqual(initCont.Args, args))
}

func TestBuildFlinkClusterWithUnsupportedSchemeJar(t *testing.T) {
job := flinkIdl.FlinkJob{
JarFiles: []string{"http://bucket.com/artifact0.jar"},
}
config := GetFlinkConfig()

flinkCtx := FlinkTaskContext{
ClusterName: ClusterName("generated-name"),
Namespace: "test-namespace",
Annotations: make(map[string]string),
Labels: make(map[string]string),
Job: job,
}

_, err := NewFlinkCluster(config, flinkCtx)
assert.Error(t, err, "downloader not implemented for scheme: http")
}

0 comments on commit 00ae1af

Please sign in to comment.