Skip to content

Commit

Permalink
Flink supports k8s operator submit task
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Sep 8, 2023
1 parent 5ca6726 commit 20814ea
Show file tree
Hide file tree
Showing 55 changed files with 3,248 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ public CompletableFuture<String> stopWithSavepoint(
return flinkShims.stopWithSavepoint(clusterClient, jobId, advanceToEndOfEventTime, savepoint);
}

public FlinkShims getFlinkShims() {
return flinkShims;
}

// ~ Inner Class -------------------------------------------------------------------------------

/** Builder for {@link ExecutionContext}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.engineconnplugin.flink.client.deployment;

import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.config.FlinkExecutionTargetType;

import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
Expand All @@ -40,6 +41,8 @@ public static ClusterDescriptorAdapter create(ExecutionContext executionContext)
new KubernetesApplicationClusterDescriptorAdapter(executionContext);
} else if (KubernetesDeploymentTarget.SESSION.getName().equals(flinkDeploymentTarget)) {
clusterDescriptorAdapter = new KubernetesSessionClusterDescriptorAdapter(executionContext);
} else if (FlinkExecutionTargetType.KUBERNETES_OPERATOR().equals(flinkDeploymentTarget)) {
clusterDescriptorAdapter = new KubernetesOperatorClusterDescriptorAdapter(executionContext);
}
return clusterDescriptorAdapter;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconnplugin.flink.client.deployment;

import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.FlinkKubernetesOperatorConfig;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.JobExecutionException;
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.*;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.util.Preconditions;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesOperatorClusterDescriptorAdapter
extends AbstractApplicationClusterDescriptorAdapter {

private static final Logger logger =
LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class);

KubernetesOperatorClusterDescriptorAdapter(ExecutionContext executionContext) {
super(executionContext);
}

public void deployCluster(String[] programArguments, String applicationClassName)
throws JobExecutionException {
FlinkKubernetesOperatorConfig flinkKubernetesOperatorConfig =
convertFlinkConfig(this.executionContext.getFlinkConfig());
this.executionContext
.getFlinkShims()
.deployKubernetesOperator(
programArguments, applicationClassName, flinkKubernetesOperatorConfig);
}

public boolean initJobId() {
try {
this.executionContext.getFlinkShims().startFlinkKubernetesOperatorWatcher();
} catch (Exception e) {
try {
// Prevent watch interruption due to network interruption.Restart Watcher.
Thread.sleep(5000);
this.executionContext.getFlinkShims().startFlinkKubernetesOperatorWatcher();
} catch (InterruptedException interruptedException) {
logger.error("Use k8s watch obtain the status failed");
}
}
return null != this.executionContext.getFlinkShims().getJobId();
}

@Override
public JobID getJobId() {
return this.executionContext.getFlinkShims().getJobId();
}

@Override
public JobStatus getJobStatus() throws JobExecutionException {
return this.executionContext.getFlinkShims().getJobStatus();
}

@Override
public void cancelJob() throws JobExecutionException {
this.executionContext.getFlinkShims().close();
}

@Override
public void close() {
this.executionContext.getFlinkShims().close();
}

@Override
public boolean isGloballyTerminalState() {
return false;
}

private FlinkKubernetesOperatorConfig convertFlinkConfig(Configuration flinkConfig) {

List<String> pipelineJars =
flinkConfig.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");

String flinkAppName =
flinkConfig.getString(
FlinkEnvConfiguration.FLINK_APP_NAME().key(),
FlinkEnvConfiguration.FLINK_APP_NAME().defaultValue());

Map<String, String> map = flinkConfig.toMap();
map.remove(FlinkEnvConfiguration.FLINK_APP_NAME().key());
map.remove(KubernetesConfigOptions.NAMESPACE.key());
map.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
map.remove(TaskManagerOptions.BIND_HOST.key());
map.remove(TaskManagerOptions.HOST.key());
map.remove(JobManagerOptions.ADDRESS.key());
map.remove(JobManagerOptions.PORT.key());
map.remove(JobManagerOptions.BIND_HOST.key());
map.remove(RestOptions.BIND_ADDRESS.key());
map.remove(RestOptions.ADDRESS.key());
map.remove(DeploymentOptions.TARGET.key());
map.remove(DeploymentOptionsInternal.CONF_DIR.key());

return FlinkKubernetesOperatorConfig.Builder()
.k8sNamespace(flinkConfig.getOptional(KubernetesConfigOptions.NAMESPACE).orElse("default"))
.k8sConfigFile(
flinkConfig
.getOptional(KubernetesConfigOptions.KUBE_CONFIG_FILE)
.orElse(System.getProperty("user.home").concat("/.kube/config")))
.k8sImage(
flinkConfig
.getOptional(KubernetesConfigOptions.CONTAINER_IMAGE)
.orElse("flink:1.16-scala_2.12-java8"))
.jobmanagerMemory(
flinkConfig.getOptional(JobManagerOptions.TOTAL_PROCESS_MEMORY).get().getMebiBytes()
+ "M")
.taskmanagerMemory(
flinkConfig.getOptional(TaskManagerOptions.TOTAL_PROCESS_MEMORY).get().getMebiBytes()
+ "M")
.jobmanagerCpu(
String.valueOf(flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU)))
.taskmanagerCpu(
String.valueOf(flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU)))
.jar(pipelineJars.get(0))
.flinkConfiguration(map)
.flinkAppName(flinkAppName.toLowerCase())
.parallelism(flinkConfig.getOptional(CoreOptions.DEFAULT_PARALLELISM).orElse(1))
.k8sServiceAccount(
flinkConfig
.getOptional(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
.orElse("default"))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ object FlinkEnvConfiguration {
val FLINK_1_16_2_VERSION = "1.16.2"
val FLINK_VERSION = CommonVars("flink.version", FLINK_1_16_2_VERSION)

val FLINK_APP_NAME = CommonVars[String]("flink.app.name", "Linkis-EngineConn-Flink")

val FLINK_HOME =
CommonVars("flink.home", CommonVars(FLINK_HOME_ENV, "/appcom/Install/flink").getValue)

Expand Down Expand Up @@ -113,7 +115,7 @@ object FlinkEnvConfiguration {
val FLINK_KUBERNETES_CONTAINER_IMAGE =
CommonVars(
"linkis.flink.kubernetes.container.image",
"apache/flink:1.12.2-scala_2.12-java8",
"flink:1.16-scala_2.12-java8",
"Image to use for Flink containers."
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,10 @@ object FlinkExecutionTargetType {
) || targetType.equalsIgnoreCase(KUBERNETES_OPERATOR)
}

def isNativeKubernetesExecutionTargetType(targetType: String): Boolean = {
targetType.equalsIgnoreCase(KUBERNETES_APPLICATION) || targetType.equalsIgnoreCase(
KUBERNETES_SESSION
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class EnvironmentContext(

private var deploymentTarget: String = YarnDeploymentTarget.PER_JOB.getName

private var extraParams: util.Map[String, Any] = _
private var extraParams: util.Map[String, Any] = new util.HashMap[String, Any]()

def this(
defaultEnv: Environment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter]
logger.info(
s"Application is started, applicationId: $getApplicationId, applicationURL: $getApplicationURL."
)
} else if (FlinkExecutionTargetType.isKubernetesExecutionTargetType(flinkDeploymentTarget)) {
} else if (
FlinkExecutionTargetType.isNativeKubernetesExecutionTargetType(flinkDeploymentTarget)
) {
if (null == clusterDescriptor.getKubernetesClusterID) {
throw new ExecutorInitException(KUBERNETES_IS_NULL.getErrorDesc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,24 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
if (StringUtils.isNotBlank(flinkMainClassJar)) {
flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(flinkMainClassJar))
}

val jobmanagerCpu = LINKIS_FLINK_CLIENT_CORES
val taskmanagerCpu = LINKIS_FLINK_TASK_MANAGER_CPU_CORES.getValue(options)

flinkConfig.set(
KubernetesConfigOptions.JOB_MANAGER_CPU,
java.lang.Double.valueOf(jobmanagerCpu)
)
flinkConfig.set(
KubernetesConfigOptions.TASK_MANAGER_CPU,
java.lang.Double.valueOf(taskmanagerCpu)
)

if (FlinkExecutionTargetType.KUBERNETES_OPERATOR.equals(flinkExecutionTarget)) {
val flinkAppName = FLINK_APP_NAME.getValue(options)
flinkConfig.setString(FLINK_APP_NAME.key, flinkAppName);
}

}
context
}
Expand Down Expand Up @@ -425,7 +443,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
}
Environment.enrich(environmentContext.getDefaultEnv, properties, Collections.emptyMap())
case FlinkExecutionTargetType.YARN_APPLICATION |
FlinkExecutionTargetType.KUBERNETES_APPLICATION =>
FlinkExecutionTargetType.KUBERNETES_APPLICATION |
FlinkExecutionTargetType.KUBERNETES_OPERATOR =>
null
case t =>
logger.error(s"Not supported YarnDeploymentTarget ${t}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.engineconnplugin.flink.client.shims;

import org.apache.linkis.engineconnplugin.flink.client.shims.config.Environment;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.FlinkKubernetesOperatorConfig;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.*;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;
Expand Down Expand Up @@ -489,4 +490,13 @@ void wrapClassLoader(Runnable runnable) {
runnable.run();
}
}

@Override
public void deployKubernetesOperator(
String[] programArguments,
String applicationClassName,
FlinkKubernetesOperatorConfig config) {
throw new UnsupportedOperationException(
"Flink-1.12.2 does not support operations related to flink kubernetes operator");
}
}
5 changes: 5 additions & 0 deletions linkis-engineconn-plugins/flink/flink-shims-1.16.2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@
<artifactId>flink-shaded-jackson</artifactId>
<version>2.12.4-15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes-shaded</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.linkis.engineconnplugin.flink.client.shims;

import org.apache.linkis.engineconnplugin.flink.client.shims.config.FlinkKubernetesOperatorConfig;
import org.apache.linkis.engineconnplugin.flink.client.shims.deployment.FlinkDeploymentOperatorClusterDescriptorAdapter;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
Expand Down Expand Up @@ -47,6 +51,9 @@

public class Flink1162Shims extends FlinkShims {

private FlinkDeploymentOperatorClusterDescriptorAdapter
flinkDeploymentOperatorClusterDescriptorAdapter;

public Flink1162Shims(String flinkVersion) {
super(flinkVersion);
}
Expand Down Expand Up @@ -177,4 +184,35 @@ private static Executor lookupExecutor(
"Could not instantiate the executor. Make sure a planner module is on the classpath", e);
}
}

@Override
public void deployKubernetesOperator(
String[] programArguments,
String applicationClassName,
FlinkKubernetesOperatorConfig config) {
flinkDeploymentOperatorClusterDescriptorAdapter =
new FlinkDeploymentOperatorClusterDescriptorAdapter(config);
flinkDeploymentOperatorClusterDescriptorAdapter.deployCluster(
programArguments, applicationClassName);
}

@Override
public void close() {
flinkDeploymentOperatorClusterDescriptorAdapter.close();
}

@Override
public JobID getJobId() {
return flinkDeploymentOperatorClusterDescriptorAdapter.getJobId();
}

@Override
public JobStatus getJobStatus() {
return flinkDeploymentOperatorClusterDescriptorAdapter.getJobStatus();
}

@Override
public void startFlinkKubernetesOperatorWatcher() {
flinkDeploymentOperatorClusterDescriptorAdapter.startFlinkKubernetesOperatorWatcher();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconnplugin.flink.client.shims.crds;

import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.AbstractFlinkSpec;
import org.apache.linkis.engineconnplugin.flink.client.shims.crds.status.CommonStatus;

import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Namespaced;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.CustomResource;

/** Abstract base class Flink resources. */
public class AbstractFlinkResource<
SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>>
extends CustomResource<SPEC, STATUS> implements Namespaced {}
Loading

0 comments on commit 20814ea

Please sign in to comment.