diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext.java index 84b48131b1..3cb6a3f3f2 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext.java @@ -317,6 +317,10 @@ public CompletableFuture stopWithSavepoint( return flinkShims.stopWithSavepoint(clusterClient, jobId, advanceToEndOfEventTime, savepoint); } + public FlinkShims getFlinkShims() { + return flinkShims; + } + // ~ Inner Class ------------------------------------------------------------------------------- /** Builder for {@link ExecutionContext}. */ diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapterFactory.java index c7ed72baf9..380c9cdbac 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapterFactory.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapterFactory.java @@ -18,6 +18,7 @@ package org.apache.linkis.engineconnplugin.flink.client.deployment; import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext; +import org.apache.linkis.engineconnplugin.flink.config.FlinkExecutionTargetType; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; @@ -40,6 +41,8 @@ public static ClusterDescriptorAdapter create(ExecutionContext executionContext) new KubernetesApplicationClusterDescriptorAdapter(executionContext); } else if (KubernetesDeploymentTarget.SESSION.getName().equals(flinkDeploymentTarget)) { clusterDescriptorAdapter = new KubernetesSessionClusterDescriptorAdapter(executionContext); + } else if (FlinkExecutionTargetType.KUBERNETES_OPERATOR().equals(flinkDeploymentTarget)) { + clusterDescriptorAdapter = new KubernetesOperatorClusterDescriptorAdapter(executionContext); } return clusterDescriptorAdapter; } diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java new file mode 100755 index 0000000000..eac432d358 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.deployment; + +import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext; +import org.apache.linkis.engineconnplugin.flink.client.shims.config.FlinkKubernetesOperatorConfig; +import org.apache.linkis.engineconnplugin.flink.client.shims.exception.JobExecutionException; +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.*; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubernetesOperatorClusterDescriptorAdapter + extends AbstractApplicationClusterDescriptorAdapter { + + private static final Logger logger = + LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class); + + KubernetesOperatorClusterDescriptorAdapter(ExecutionContext executionContext) { + super(executionContext); + } + + public void deployCluster(String[] programArguments, String applicationClassName) + throws JobExecutionException { + FlinkKubernetesOperatorConfig flinkKubernetesOperatorConfig = + convertFlinkConfig(this.executionContext.getFlinkConfig()); + this.executionContext + .getFlinkShims() + .deployKubernetesOperator( + programArguments, applicationClassName, flinkKubernetesOperatorConfig); + } + + public boolean initJobId() { + try { + this.executionContext.getFlinkShims().startFlinkKubernetesOperatorWatcher(); + } catch (Exception e) { + try { + // Prevent watch interruption due to network interruption.Restart Watcher. + Thread.sleep(5000); + this.executionContext.getFlinkShims().startFlinkKubernetesOperatorWatcher(); + } catch (InterruptedException interruptedException) { + logger.error("Use k8s watch obtain the status failed"); + } + } + return null != this.executionContext.getFlinkShims().getJobId(); + } + + @Override + public JobID getJobId() { + return this.executionContext.getFlinkShims().getJobId(); + } + + @Override + public JobStatus getJobStatus() throws JobExecutionException { + return this.executionContext.getFlinkShims().getJobStatus(); + } + + @Override + public void cancelJob() throws JobExecutionException { + this.executionContext.getFlinkShims().close(); + } + + @Override + public void close() { + this.executionContext.getFlinkShims().close(); + } + + @Override + public boolean isGloballyTerminalState() { + return false; + } + + private FlinkKubernetesOperatorConfig convertFlinkConfig(Configuration flinkConfig) { + + List pipelineJars = + flinkConfig.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList()); + Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + + String flinkAppName = + flinkConfig.getString( + FlinkEnvConfiguration.FLINK_APP_NAME().key(), + FlinkEnvConfiguration.FLINK_APP_NAME().defaultValue()); + + Map map = flinkConfig.toMap(); + map.remove(FlinkEnvConfiguration.FLINK_APP_NAME().key()); + map.remove(KubernetesConfigOptions.NAMESPACE.key()); + map.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key()); + map.remove(TaskManagerOptions.BIND_HOST.key()); + map.remove(TaskManagerOptions.HOST.key()); + map.remove(JobManagerOptions.ADDRESS.key()); + map.remove(JobManagerOptions.PORT.key()); + map.remove(JobManagerOptions.BIND_HOST.key()); + map.remove(RestOptions.BIND_ADDRESS.key()); + map.remove(RestOptions.ADDRESS.key()); + map.remove(DeploymentOptions.TARGET.key()); + map.remove(DeploymentOptionsInternal.CONF_DIR.key()); + + return FlinkKubernetesOperatorConfig.Builder() + .k8sNamespace(flinkConfig.getOptional(KubernetesConfigOptions.NAMESPACE).orElse("default")) + .k8sConfigFile( + flinkConfig + .getOptional(KubernetesConfigOptions.KUBE_CONFIG_FILE) + .orElse(System.getProperty("user.home").concat("/.kube/config"))) + .k8sImage( + flinkConfig + .getOptional(KubernetesConfigOptions.CONTAINER_IMAGE) + .orElse("flink:1.16-scala_2.12-java8")) + .jobmanagerMemory( + flinkConfig.getOptional(JobManagerOptions.TOTAL_PROCESS_MEMORY).get().getMebiBytes() + + "M") + .taskmanagerMemory( + flinkConfig.getOptional(TaskManagerOptions.TOTAL_PROCESS_MEMORY).get().getMebiBytes() + + "M") + .jobmanagerCpu( + String.valueOf(flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU))) + .taskmanagerCpu( + String.valueOf(flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU))) + .jar(pipelineJars.get(0)) + .flinkConfiguration(map) + .flinkAppName(flinkAppName.toLowerCase()) + .parallelism(flinkConfig.getOptional(CoreOptions.DEFAULT_PARALLELISM).orElse(1)) + .k8sServiceAccount( + flinkConfig + .getOptional(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT) + .orElse("default")) + .build(); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index 6b521dceed..4dda5a5366 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -28,6 +28,8 @@ object FlinkEnvConfiguration { val FLINK_1_16_2_VERSION = "1.16.2" val FLINK_VERSION = CommonVars("flink.version", FLINK_1_16_2_VERSION) + val FLINK_APP_NAME = CommonVars[String]("flink.app.name", "Linkis-EngineConn-Flink") + val FLINK_HOME = CommonVars("flink.home", CommonVars(FLINK_HOME_ENV, "/appcom/Install/flink").getValue) @@ -113,7 +115,7 @@ object FlinkEnvConfiguration { val FLINK_KUBERNETES_CONTAINER_IMAGE = CommonVars( "linkis.flink.kubernetes.container.image", - "apache/flink:1.12.2-scala_2.12-java8", + "flink:1.16-scala_2.12-java8", "Image to use for Flink containers." ) diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkExecutionTargetType.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkExecutionTargetType.scala index ffb2becaeb..42fcfe92b2 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkExecutionTargetType.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkExecutionTargetType.scala @@ -39,4 +39,10 @@ object FlinkExecutionTargetType { ) || targetType.equalsIgnoreCase(KUBERNETES_OPERATOR) } + def isNativeKubernetesExecutionTargetType(targetType: String): Boolean = { + targetType.equalsIgnoreCase(KUBERNETES_APPLICATION) || targetType.equalsIgnoreCase( + KUBERNETES_SESSION + ) + } + } diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala index 621cbea98e..48a583d16a 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala @@ -57,7 +57,7 @@ class EnvironmentContext( private var deploymentTarget: String = YarnDeploymentTarget.PER_JOB.getName - private var extraParams: util.Map[String, Any] = _ + private var extraParams: util.Map[String, Any] = new util.HashMap[String, Any]() def this( defaultEnv: Environment, diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala index b79f1135bc..a4ce3c63c2 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala @@ -85,7 +85,9 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter] logger.info( s"Application is started, applicationId: $getApplicationId, applicationURL: $getApplicationURL." ) - } else if (FlinkExecutionTargetType.isKubernetesExecutionTargetType(flinkDeploymentTarget)) { + } else if ( + FlinkExecutionTargetType.isNativeKubernetesExecutionTargetType(flinkDeploymentTarget) + ) { if (null == clusterDescriptor.getKubernetesClusterID) { throw new ExecutorInitException(KUBERNETES_IS_NULL.getErrorDesc) } diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 1c6db3bba9..1b6b66d7b4 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -286,6 +286,24 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging if (StringUtils.isNotBlank(flinkMainClassJar)) { flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(flinkMainClassJar)) } + + val jobmanagerCpu = LINKIS_FLINK_CLIENT_CORES + val taskmanagerCpu = LINKIS_FLINK_TASK_MANAGER_CPU_CORES.getValue(options) + + flinkConfig.set( + KubernetesConfigOptions.JOB_MANAGER_CPU, + java.lang.Double.valueOf(jobmanagerCpu) + ) + flinkConfig.set( + KubernetesConfigOptions.TASK_MANAGER_CPU, + java.lang.Double.valueOf(taskmanagerCpu) + ) + + if (FlinkExecutionTargetType.KUBERNETES_OPERATOR.equals(flinkExecutionTarget)) { + val flinkAppName = FLINK_APP_NAME.getValue(options) + flinkConfig.setString(FLINK_APP_NAME.key, flinkAppName); + } + } context } @@ -425,7 +443,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } Environment.enrich(environmentContext.getDefaultEnv, properties, Collections.emptyMap()) case FlinkExecutionTargetType.YARN_APPLICATION | - FlinkExecutionTargetType.KUBERNETES_APPLICATION => + FlinkExecutionTargetType.KUBERNETES_APPLICATION | + FlinkExecutionTargetType.KUBERNETES_OPERATOR => null case t => logger.error(s"Not supported YarnDeploymentTarget ${t}.") diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.12.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/Flink1122Shims.java b/linkis-engineconn-plugins/flink/flink-shims-1.12.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/Flink1122Shims.java index 039bb761b4..9d2d4cba2c 100644 --- a/linkis-engineconn-plugins/flink/flink-shims-1.12.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/Flink1122Shims.java +++ b/linkis-engineconn-plugins/flink/flink-shims-1.12.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/Flink1122Shims.java @@ -18,6 +18,7 @@ package org.apache.linkis.engineconnplugin.flink.client.shims; import org.apache.linkis.engineconnplugin.flink.client.shims.config.Environment; +import org.apache.linkis.engineconnplugin.flink.client.shims.config.FlinkKubernetesOperatorConfig; import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.*; import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary; import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException; @@ -489,4 +490,13 @@ void wrapClassLoader(Runnable runnable) { runnable.run(); } } + + @Override + public void deployKubernetesOperator( + String[] programArguments, + String applicationClassName, + FlinkKubernetesOperatorConfig config) { + throw new UnsupportedOperationException( + "Flink-1.12.2 does not support operations related to flink kubernetes operator"); + } } diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/pom.xml b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/pom.xml index 6505936f43..9daf553a10 100644 --- a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/pom.xml +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/pom.xml @@ -110,6 +110,11 @@ flink-shaded-jackson 2.12.4-15.0 + + org.apache.flink + flink-kubernetes-shaded + 1.2.0 + org.apache.flink flink-yarn diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/Flink1162Shims.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/Flink1162Shims.java index e83d1b800e..3e89f6349e 100644 --- a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/Flink1162Shims.java +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/Flink1162Shims.java @@ -17,7 +17,11 @@ package org.apache.linkis.engineconnplugin.flink.client.shims; +import org.apache.linkis.engineconnplugin.flink.client.shims.config.FlinkKubernetesOperatorConfig; +import org.apache.linkis.engineconnplugin.flink.client.shims.deployment.FlinkDeploymentOperatorClusterDescriptorAdapter; + import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.SavepointFormatType; @@ -47,6 +51,9 @@ public class Flink1162Shims extends FlinkShims { + private FlinkDeploymentOperatorClusterDescriptorAdapter + flinkDeploymentOperatorClusterDescriptorAdapter; + public Flink1162Shims(String flinkVersion) { super(flinkVersion); } @@ -177,4 +184,35 @@ private static Executor lookupExecutor( "Could not instantiate the executor. Make sure a planner module is on the classpath", e); } } + + @Override + public void deployKubernetesOperator( + String[] programArguments, + String applicationClassName, + FlinkKubernetesOperatorConfig config) { + flinkDeploymentOperatorClusterDescriptorAdapter = + new FlinkDeploymentOperatorClusterDescriptorAdapter(config); + flinkDeploymentOperatorClusterDescriptorAdapter.deployCluster( + programArguments, applicationClassName); + } + + @Override + public void close() { + flinkDeploymentOperatorClusterDescriptorAdapter.close(); + } + + @Override + public JobID getJobId() { + return flinkDeploymentOperatorClusterDescriptorAdapter.getJobId(); + } + + @Override + public JobStatus getJobStatus() { + return flinkDeploymentOperatorClusterDescriptorAdapter.getJobStatus(); + } + + @Override + public void startFlinkKubernetesOperatorWatcher() { + flinkDeploymentOperatorClusterDescriptorAdapter.startFlinkKubernetesOperatorWatcher(); + } } diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/AbstractFlinkResource.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/AbstractFlinkResource.java new file mode 100644 index 0000000000..dc4b8ba630 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/AbstractFlinkResource.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.AbstractFlinkSpec; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.status.CommonStatus; + +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Namespaced; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.CustomResource; + +/** Abstract base class Flink resources. */ +public class AbstractFlinkResource< + SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus> + extends CustomResource implements Namespaced {} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/CrdConstants.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/CrdConstants.java new file mode 100644 index 0000000000..a0cc7d70c8 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/CrdConstants.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds; + +/** Constants used by the CRD. */ +public class CrdConstants { + public static final String API_GROUP = "flink.apache.org"; + public static final String API_VERSION = "v1beta1"; + public static final String KIND_SESSION_JOB = "FlinkSessionJob"; + public static final String KIND_FLINK_DEPLOYMENT = "FlinkDeployment"; + + public static final String LABEL_TARGET_SESSION = "target.session"; + + public static final String EPHEMERAL_STORAGE = "ephemeral-storage"; +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkDeployment.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkDeployment.java new file mode 100644 index 0000000000..18195708e1 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkDeployment.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.FlinkDeploymentSpec; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.status.FlinkDeploymentStatus; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Namespaced; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.Group; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.Kind; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.ShortNames; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.Version; + +/** Custom resource definition that represents both Application and Session deployments. */ +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonDeserialize() +@Group(CrdConstants.API_GROUP) +@Version(CrdConstants.API_VERSION) +@ShortNames({"flinkdep"}) +@Kind(CrdConstants.KIND_FLINK_DEPLOYMENT) +public class FlinkDeployment + extends AbstractFlinkResource + implements Namespaced { + + @Override + public FlinkDeploymentStatus initStatus() { + return new FlinkDeploymentStatus(); + } + + @Override + public FlinkDeploymentSpec initSpec() { + return new FlinkDeploymentSpec(); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkDeploymentList.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkDeploymentList.java new file mode 100644 index 0000000000..97e330fe54 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkDeploymentList.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds; + +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.CustomResourceList; + +/** + * Multiple Flink deployments. Please do not delete. This class is used by downstream projects + * interacting with the FlinkDeployment CRD via the Fabric8 Java client. + */ +public class FlinkDeploymentList extends CustomResourceList { + public FlinkDeploymentList() {} +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkSessionJob.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkSessionJob.java new file mode 100644 index 0000000000..7f30fd4a7f --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkSessionJob.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.FlinkSessionJobSpec; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.status.FlinkSessionJobStatus; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Namespaced; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.Group; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.Kind; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.ShortNames; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.Version; + +/** Custom resource definition that represents a flink session job. */ +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonDeserialize() +@Group(CrdConstants.API_GROUP) +@Version(CrdConstants.API_VERSION) +@ShortNames({"sessionjob"}) +@Kind(CrdConstants.KIND_SESSION_JOB) +public class FlinkSessionJob + extends AbstractFlinkResource + implements Namespaced { + + @Override + protected FlinkSessionJobStatus initStatus() { + return new FlinkSessionJobStatus(); + } + + @Override + public FlinkSessionJobSpec initSpec() { + return new FlinkSessionJobSpec(); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkSessionJobList.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkSessionJobList.java new file mode 100644 index 0000000000..c0754c144b --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/FlinkSessionJobList.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds; + +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.CustomResourceList; + +/** + * Multiple Flink session jobs. Please do not delete. This class is used by downstream projects + * interacting with the FlinkSessionJobs CRD via the Fabric8 Java client. + */ +public class FlinkSessionJobList extends CustomResourceList { + public FlinkSessionJobList() {} +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/diff/DiffType.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/diff/DiffType.java new file mode 100644 index 0000000000..3ce71411dd --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/diff/DiffType.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff; + +/** Spec change type. */ +public enum DiffType { + + /** Ignorable spec change. */ + IGNORE, + /** Scalable spec change. */ + SCALE, + /** Upgradable spec change. */ + UPGRADE; + + public static DiffType max(DiffType left, DiffType right) { + return (left.ordinal() >= right.ordinal()) ? left : right; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/diff/Diffable.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/diff/Diffable.java new file mode 100644 index 0000000000..2b6b3b6341 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/diff/Diffable.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff; + +/** + * {@link Diffable} classes can be compared with other {@link Diffable} objects for differences. + * + *

Inspired by: + * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/Diffable.java + */ +public interface Diffable {} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/diff/SpecDiff.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/diff/SpecDiff.java new file mode 100644 index 0000000000..ac64abf18f --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/diff/SpecDiff.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** Spec diff annotation. */ +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface SpecDiff { + DiffType value(); + + /** Spec diff config annotation. */ + @Target(ElementType.FIELD) + @Retention(RetentionPolicy.RUNTIME) + @interface Config { + Entry[] value(); + } + + /** Spec diff config annotation entry. */ + @Target(ElementType.FIELD) + @Retention(RetentionPolicy.RUNTIME) + @interface Entry { + String prefix(); + + DiffType type(); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/lifecycle/ResourceLifecycleState.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/lifecycle/ResourceLifecycleState.java new file mode 100644 index 0000000000..bd8eda11f3 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/lifecycle/ResourceLifecycleState.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.lifecycle; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; + +/** Enum encapsulating the lifecycle state of a Flink resource. */ +public enum ResourceLifecycleState { + CREATED(false, "The resource was created in Kubernetes but not yet handled by the operator"), + SUSPENDED(true, "The resource (job) has been suspended"), + UPGRADING(false, "The resource is being upgraded"), + DEPLOYED( + false, + "The resource is deployed/submitted to Kubernetes, but it’s not yet considered to be stable and might be rolled back in the future"), + STABLE(true, "The resource deployment is considered to be stable and won’t be rolled back"), + ROLLING_BACK(false, "The resource is being rolled back to the last stable spec"), + ROLLED_BACK(true, "The resource is deployed with the last stable spec"), + FAILED(true, "The job terminally failed"); + + @JsonIgnore private final boolean terminal; + @JsonIgnore private final String description; + + ResourceLifecycleState(boolean terminal, String description) { + this.terminal = terminal; + this.description = description; + } + + public Set getClearedStatesAfterTransition( + ResourceLifecycleState transitionFrom) { + if (this == transitionFrom) { + return Collections.emptySet(); + } + + EnumSet states = EnumSet.allOf(ResourceLifecycleState.class); + if (terminal) { + states.remove(this); + return states; + } + + if (this == UPGRADING) { + states.remove(UPGRADING); + states.remove(transitionFrom); + return states; + } + + return Collections.emptySet(); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/reconciler/ReconciliationMetadata.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/reconciler/ReconciliationMetadata.java new file mode 100644 index 0000000000..049d5e06c4 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/reconciler/ReconciliationMetadata.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.reconciler; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.AbstractFlinkResource; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.status.ReconciliationState; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.status.ReconciliationStatus; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ObjectMeta; + +/** Extra metadata to be attached to the reconciled spec. */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ReconciliationMetadata { + + private String apiVersion; + + private ObjectMeta metadata; + + private boolean firstDeployment; + + public ReconciliationMetadata(String apiVersion, ObjectMeta metadata, boolean firstDeployment) { + this.apiVersion = apiVersion; + this.metadata = metadata; + this.firstDeployment = firstDeployment; + } + + public static ReconciliationMetadata from(AbstractFlinkResource resource) { + ObjectMeta metadata = new ObjectMeta(); + metadata.setGeneration(resource.getMetadata().getGeneration()); + + boolean firstDeploy = + resource.getStatus().getReconciliationStatus().isBeforeFirstDeployment() + || isFirstDeployment(resource); + + return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy); + } + + private static boolean isFirstDeployment(AbstractFlinkResource resource) { + ReconciliationStatus reconStatus = resource.getStatus().getReconciliationStatus(); + return reconStatus.getState() == ReconciliationState.DEPLOYED + && reconStatus.deserializeLastReconciledSpecWithMeta().getMeta().firstDeployment; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/AbstractFlinkSpec.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/AbstractFlinkSpec.java new file mode 100644 index 0000000000..c4aeee8e93 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/AbstractFlinkSpec.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff.DiffType; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff.Diffable; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff.SpecDiff; + +import java.util.Map; + +/** The common spec. */ +public abstract class AbstractFlinkSpec implements Diffable { + + /** Job specification for application deployments/session job. Null for session clusters. */ + private JobSpec job; + + /** + * Nonce used to manually trigger restart for the cluster/session job. In order to trigger + * restart, change the number to anything other than the current value. + */ + private Long restartNonce; + + /** Flink configuration overrides for the Flink deployment or Flink session job. */ + @SpecDiff.Config({ + @SpecDiff.Entry(prefix = "parallelism.default", type = DiffType.IGNORE), + @SpecDiff.Entry(prefix = "kubernetes.operator", type = DiffType.IGNORE), + }) + private Map flinkConfiguration; + + public JobSpec getJob() { + return job; + } + + public void setJob(JobSpec job) { + this.job = job; + } + + public Long getRestartNonce() { + return restartNonce; + } + + public void setRestartNonce(Long restartNonce) { + this.restartNonce = restartNonce; + } + + public Map getFlinkConfiguration() { + return flinkConfiguration; + } + + public void setFlinkConfiguration(Map flinkConfiguration) { + this.flinkConfiguration = flinkConfiguration; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/FlinkDeploymentSpec.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/FlinkDeploymentSpec.java new file mode 100644 index 0000000000..5c3f39c256 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/FlinkDeploymentSpec.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod; + +import java.util.Map; + +/** Spec that describes a Flink application or session cluster deployment. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class FlinkDeploymentSpec extends AbstractFlinkSpec { + /** Flink docker image used to start the Job and TaskManager pods. */ + private String image; + + /** Image pull policy of the Flink docker image. */ + private String imagePullPolicy; + + /** Kubernetes service used by the Flink deployment. */ + private String serviceAccount; + + /** Flink image version. */ + private FlinkVersion flinkVersion; + + /** Ingress specs. */ + private IngressSpec ingress; + + /** + * Base pod template for job and task manager pods. Can be overridden by the jobManager and + * taskManager pod templates. + */ + private Pod podTemplate; + + /** JobManager specs. */ + private JobManagerSpec jobManager; + + /** TaskManager specs. */ + private TaskManagerSpec taskManager; + + /** + * Log configuration overrides for the Flink deployment. Format logConfigFileName -> + * configContent. + */ + private Map logConfiguration; + + /** Deployment mode of the Flink cluster, native or standalone. */ + private KubernetesDeploymentMode mode; + + public String getImage() { + return image; + } + + public void setImage(String image) { + this.image = image; + } + + public String getImagePullPolicy() { + return imagePullPolicy; + } + + public void setImagePullPolicy(String imagePullPolicy) { + this.imagePullPolicy = imagePullPolicy; + } + + public String getServiceAccount() { + return serviceAccount; + } + + public void setServiceAccount(String serviceAccount) { + this.serviceAccount = serviceAccount; + } + + public FlinkVersion getFlinkVersion() { + return flinkVersion; + } + + public void setFlinkVersion(FlinkVersion flinkVersion) { + this.flinkVersion = flinkVersion; + } + + public IngressSpec getIngress() { + return ingress; + } + + public void setIngress(IngressSpec ingress) { + this.ingress = ingress; + } + + public Pod getPodTemplate() { + return podTemplate; + } + + public void setPodTemplate(Pod podTemplate) { + this.podTemplate = podTemplate; + } + + public JobManagerSpec getJobManager() { + return jobManager; + } + + public void setJobManager(JobManagerSpec jobManager) { + this.jobManager = jobManager; + } + + public TaskManagerSpec getTaskManager() { + return taskManager; + } + + public void setTaskManager(TaskManagerSpec taskManager) { + this.taskManager = taskManager; + } + + public Map getLogConfiguration() { + return logConfiguration; + } + + public void setLogConfiguration(Map logConfiguration) { + this.logConfiguration = logConfiguration; + } + + public KubernetesDeploymentMode getMode() { + return mode; + } + + public void setMode(KubernetesDeploymentMode mode) { + this.mode = mode; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/FlinkSessionJobSpec.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/FlinkSessionJobSpec.java new file mode 100644 index 0000000000..8d6e3c8dd5 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/FlinkSessionJobSpec.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.Map; + +/** Spec that describes a Flink session job. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class FlinkSessionJobSpec extends AbstractFlinkSpec { + + /** The name of the target session cluster deployment. */ + private String deploymentName; + + public String getDeploymentName() { + return deploymentName; + } + + public void setDeploymentName(String deploymentName) { + this.deploymentName = deploymentName; + } + + public static FlinkSessionJobSpecBuilder Builder() { + return new FlinkSessionJobSpecBuilder(); + } + + public static class FlinkSessionJobSpecBuilder { + private JobSpec job; + private Long restartNonce; + private Map flinkConfiguration; + private String deploymentName; + + private FlinkSessionJobSpecBuilder() {} + + public static FlinkSessionJobSpecBuilder aFlinkSessionJobSpec() { + return new FlinkSessionJobSpecBuilder(); + } + + public FlinkSessionJobSpecBuilder job(JobSpec job) { + this.job = job; + return this; + } + + public FlinkSessionJobSpecBuilder restartNonce(Long restartNonce) { + this.restartNonce = restartNonce; + return this; + } + + public FlinkSessionJobSpecBuilder flinkConfiguration(Map flinkConfiguration) { + this.flinkConfiguration = flinkConfiguration; + return this; + } + + public FlinkSessionJobSpecBuilder deploymentName(String deploymentName) { + this.deploymentName = deploymentName; + return this; + } + + public FlinkSessionJobSpec build() { + FlinkSessionJobSpec flinkSessionJobSpec = new FlinkSessionJobSpec(); + flinkSessionJobSpec.setJob(job); + flinkSessionJobSpec.setRestartNonce(restartNonce); + flinkSessionJobSpec.setFlinkConfiguration(flinkConfiguration); + flinkSessionJobSpec.setDeploymentName(deploymentName); + return flinkSessionJobSpec; + } + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/FlinkVersion.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/FlinkVersion.java new file mode 100644 index 0000000000..717eae03b6 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/FlinkVersion.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +/** Enumeration for supported Flink versions. */ +public enum FlinkVersion { + v1_13, + v1_14, + v1_15, + v1_16, + v1_17; + + public boolean isNewerVersionThan(FlinkVersion otherVersion) { + return this.ordinal() > otherVersion.ordinal(); + } + + /** + * Returns the current version. + * + * @return The current version. + */ + public static FlinkVersion current() { + return values()[values().length - 1]; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/IngressSpec.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/IngressSpec.java new file mode 100644 index 0000000000..dd587b0b91 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/IngressSpec.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.Map; + +/** Ingress spec. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IngressSpec { + + /** Ingress template for the JobManager service. */ + private String template; + + /** Ingress className for the Flink deployment. */ + private String className; + + /** Ingress annotations. */ + private Map annotations; +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/JobManagerSpec.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/JobManagerSpec.java new file mode 100644 index 0000000000..8929c74848 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/JobManagerSpec.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod; + +/** JobManager spec. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class JobManagerSpec { + /** Resource specification for the JobManager pods. */ + private Resource resource; + + /** Number of JobManager replicas. Must be 1 for non-HA deployments. */ + private int replicas = 1; + + /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */ + private Pod podTemplate; + + public Resource getResource() { + return resource; + } + + public void setResource(Resource resource) { + this.resource = resource; + } + + public int getReplicas() { + return replicas; + } + + public void setReplicas(int replicas) { + this.replicas = replicas; + } + + public Pod getPodTemplate() { + return podTemplate; + } + + public void setPodTemplate(Pod podTemplate) { + this.podTemplate = podTemplate; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/JobSpec.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/JobSpec.java new file mode 100644 index 0000000000..78009a037a --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/JobSpec.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff.DiffType; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff.Diffable; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff.SpecDiff; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +/** Flink job spec. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class JobSpec implements Diffable { + + /** + * Optional URI of the job jar within the Flink docker container. For example: + * local:///opt/flink/examples/streaming/StateMachineExample.jar. If not specified the job jar + * should be available in the system classpath. + */ + private String jarURI; + + /** Parallelism of the Flink job. */ + @SpecDiff(DiffType.SCALE) + private int parallelism; + + /** Fully qualified main class name of the Flink job. */ + private String entryClass; + + /** Arguments for the Flink job main class. */ + private String[] args = new String[0]; + + /** Desired state for the job. */ + private JobState state = JobState.RUNNING; + + /** + * Nonce used to manually trigger savepoint for the running job. In order to trigger a savepoint, + * change the number to anything other than the current value. + */ + @SpecDiff(DiffType.IGNORE) + private Long savepointTriggerNonce; + + /** + * Savepoint path used by the job the first time it is deployed. Upgrades/redeployments will not + * be affected. + */ + @SpecDiff(DiffType.IGNORE) + private String initialSavepointPath; + + /** Upgrade mode of the Flink job. */ + @SpecDiff(DiffType.IGNORE) + private UpgradeMode upgradeMode = UpgradeMode.STATELESS; + + /** Allow checkpoint state that cannot be mapped to any job vertex in tasks. */ + @SpecDiff(DiffType.IGNORE) + private Boolean allowNonRestoredState; + + public String getJarURI() { + return jarURI; + } + + public void setJarURI(String jarURI) { + this.jarURI = jarURI; + } + + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public String getEntryClass() { + return entryClass; + } + + public void setEntryClass(String entryClass) { + this.entryClass = entryClass; + } + + public String[] getArgs() { + return args; + } + + public void setArgs(String[] args) { + this.args = args; + } + + public JobState getState() { + return state; + } + + public void setState(JobState state) { + this.state = state; + } + + public Long getSavepointTriggerNonce() { + return savepointTriggerNonce; + } + + public void setSavepointTriggerNonce(Long savepointTriggerNonce) { + this.savepointTriggerNonce = savepointTriggerNonce; + } + + public String getInitialSavepointPath() { + return initialSavepointPath; + } + + public void setInitialSavepointPath(String initialSavepointPath) { + this.initialSavepointPath = initialSavepointPath; + } + + public UpgradeMode getUpgradeMode() { + return upgradeMode; + } + + public void setUpgradeMode(UpgradeMode upgradeMode) { + this.upgradeMode = upgradeMode; + } + + public Boolean getAllowNonRestoredState() { + return allowNonRestoredState; + } + + public void setAllowNonRestoredState(Boolean allowNonRestoredState) { + this.allowNonRestoredState = allowNonRestoredState; + } + + public static JobSpecBuilder Builder() { + return new JobSpecBuilder(); + } + + public static class JobSpecBuilder { + private String jarURI; + private int parallelism; + private String entryClass; + private String[] args; + private JobState state; + private Long savepointTriggerNonce; + private String initialSavepointPath; + private UpgradeMode upgradeMode; + private Boolean allowNonRestoredState; + + private JobSpecBuilder() {} + + public static JobSpecBuilder aJobSpec() { + return new JobSpecBuilder(); + } + + public JobSpecBuilder jarURI(String jarURI) { + this.jarURI = jarURI; + return this; + } + + public JobSpecBuilder parallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public JobSpecBuilder entryClass(String entryClass) { + this.entryClass = entryClass; + return this; + } + + public JobSpecBuilder args(String[] args) { + this.args = args; + return this; + } + + public JobSpecBuilder state(JobState state) { + this.state = state; + return this; + } + + public JobSpecBuilder savepointTriggerNonce(Long savepointTriggerNonce) { + this.savepointTriggerNonce = savepointTriggerNonce; + return this; + } + + public JobSpecBuilder initialSavepointPath(String initialSavepointPath) { + this.initialSavepointPath = initialSavepointPath; + return this; + } + + public JobSpecBuilder upgradeMode(UpgradeMode upgradeMode) { + this.upgradeMode = upgradeMode; + return this; + } + + public JobSpecBuilder allowNonRestoredState(Boolean allowNonRestoredState) { + this.allowNonRestoredState = allowNonRestoredState; + return this; + } + + public JobSpec build() { + JobSpec jobSpec = new JobSpec(); + jobSpec.setJarURI(jarURI); + jobSpec.setParallelism(parallelism); + jobSpec.setEntryClass(entryClass); + jobSpec.setArgs(args); + jobSpec.setState(state); + jobSpec.setSavepointTriggerNonce(savepointTriggerNonce); + jobSpec.setInitialSavepointPath(initialSavepointPath); + jobSpec.setUpgradeMode(upgradeMode); + jobSpec.setAllowNonRestoredState(allowNonRestoredState); + return jobSpec; + } + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/JobState.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/JobState.java new file mode 100644 index 0000000000..8210a3901e --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/JobState.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonProperty; + +/** Enum describing the desired job state. */ +public enum JobState { + + /** Job is expected to be processing data. */ + @JsonProperty("running") + RUNNING, + + /** Processing is suspended with the intention of continuing later. */ + @JsonProperty("suspended") + SUSPENDED +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/KubernetesDeploymentMode.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/KubernetesDeploymentMode.java new file mode 100644 index 0000000000..af7671012f --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/KubernetesDeploymentMode.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonProperty; + +/** Enum to control Flink deployment mode on Kubernetes. */ +public enum KubernetesDeploymentMode { + + /** + * Deploys Flink using Flinks native Kubernetes support. Only supported for newer versions of + * Flink + */ + @JsonProperty("native") + NATIVE, + + /** Deploys Flink on-top of kubernetes in standalone mode. */ + @JsonProperty("standalone") + STANDALONE; + + public static KubernetesDeploymentMode getDeploymentMode(FlinkDeploymentSpec spec) { + return spec.getMode() == null ? KubernetesDeploymentMode.NATIVE : spec.getMode(); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/Resource.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/Resource.java new file mode 100644 index 0000000000..72c474f47e --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/Resource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +/** Resource spec. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class Resource { + /** Amount of CPU allocated to the pod. */ + private Double cpu; + + /** Amount of memory allocated to the pod. Example: 1024m, 1g */ + private String memory; + + /** Amount of ephemeral storage allocated to the pod. Example: 1024m, 2G */ + private String ephemeralStorage; + + public Resource(Double cpu, String memory, String ephemeralStorage) { + this.cpu = cpu; + this.memory = memory; + this.ephemeralStorage = ephemeralStorage; + } + + public Double getCpu() { + return cpu; + } + + public void setCpu(Double cpu) { + this.cpu = cpu; + } + + public String getMemory() { + return memory; + } + + public void setMemory(String memory) { + this.memory = memory; + } + + public String getEphemeralStorage() { + return ephemeralStorage; + } + + public void setEphemeralStorage(String ephemeralStorage) { + this.ephemeralStorage = ephemeralStorage; + } + + public Resource() {} +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/TaskManagerSpec.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/TaskManagerSpec.java new file mode 100644 index 0000000000..871bf751a6 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/TaskManagerSpec.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff.DiffType; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff.Diffable; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.diff.SpecDiff; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.SpecReplicas; + +/** TaskManager spec. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class TaskManagerSpec implements Diffable { + /** Resource specification for the TaskManager pods. */ + private Resource resource; + + /** Number of TaskManager replicas. If defined, takes precedence over parallelism */ + @SpecDiff(DiffType.SCALE) + @SpecReplicas + private Integer replicas; + + /** TaskManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */ + private Pod podTemplate; + + public Resource getResource() { + return resource; + } + + public void setResource(Resource resource) { + this.resource = resource; + } + + public Integer getReplicas() { + return replicas; + } + + public void setReplicas(Integer replicas) { + this.replicas = replicas; + } + + public Pod getPodTemplate() { + return podTemplate; + } + + public void setPodTemplate(Pod podTemplate) { + this.podTemplate = podTemplate; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/UpgradeMode.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/UpgradeMode.java new file mode 100644 index 0000000000..b28f5fca87 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/spec/UpgradeMode.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonProperty; + +/** Enum to control Flink job upgrade behavior. */ +public enum UpgradeMode { + + /** + * Job is upgraded by first taking a savepoint of the running job, shutting it down and restoring + * from the savepoint. + */ + @JsonProperty("savepoint") + SAVEPOINT, + + /** Job is upgraded using any latest checkpoint or savepoint available. */ + @JsonProperty("last-state") + LAST_STATE, + + /** Job is upgraded with empty state. */ + @JsonProperty("stateless") + STATELESS +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/CommonStatus.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/CommonStatus.java new file mode 100644 index 0000000000..4a00c8a6e2 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/CommonStatus.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.lifecycle.ResourceLifecycleState; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.AbstractFlinkSpec; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.JobState; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.PrinterColumn; + +import static org.apache.linkis.engineconnplugin.flink.client.shims.crds.lifecycle.ResourceLifecycleState.ROLLING_BACK; +import static org.apache.linkis.engineconnplugin.flink.client.shims.crds.lifecycle.ResourceLifecycleState.UPGRADING; + +/** Last observed common status of the Flink deployment/Flink SessionJob. */ +public abstract class CommonStatus { + + /** Last observed status of the Flink job on Application/Session cluster. */ + private JobStatus jobStatus = new JobStatus(); + + public JobStatus getJobStatus() { + return jobStatus; + } + + public void setJobStatus(JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + + /** Error information about the FlinkDeployment/FlinkSessionJob. */ + private String error; + + /** Lifecycle state of the Flink resource (including being rolled back, failed etc.). */ + @PrinterColumn(name = "Lifecycle State") + // Calculated from the status, requires no setter. The purpose of this is to expose as a printer + // column. + private ResourceLifecycleState lifecycleState; + + /** + * Current reconciliation status of this resource. + * + * @return Current {@link ReconciliationStatus}. + */ + public abstract ReconciliationStatus getReconciliationStatus(); + + public ResourceLifecycleState getLifecycleState() { + ReconciliationStatus reconciliationStatus = getReconciliationStatus(); + + if (reconciliationStatus.isBeforeFirstDeployment()) { + return StringUtils.isEmpty(error) + ? ResourceLifecycleState.CREATED + : ResourceLifecycleState.FAILED; + } + + switch (reconciliationStatus.getState()) { + case UPGRADING: + return UPGRADING; + case ROLLING_BACK: + return ROLLING_BACK; + } + + AbstractFlinkSpec lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec(); + if (lastReconciledSpec.getJob() != null + && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED) { + return ResourceLifecycleState.SUSPENDED; + } + + String jobState = getJobStatus().getState(); + if (jobState != null && FlinkJobStatus.valueOf(jobState).equals(FlinkJobStatus.FAILED)) { + return ResourceLifecycleState.FAILED; + } + + if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) { + return ResourceLifecycleState.ROLLED_BACK; + } else if (reconciliationStatus.isLastReconciledSpecStable()) { + return ResourceLifecycleState.STABLE; + } + + return ResourceLifecycleState.DEPLOYED; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkDeploymentReconciliationStatus.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkDeploymentReconciliationStatus.java new file mode 100644 index 0000000000..d723f60d6a --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkDeploymentReconciliationStatus.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.FlinkDeploymentSpec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +/** Status of the last reconcile step for the flink deployment. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class FlinkDeploymentReconciliationStatus extends ReconciliationStatus { + + @Override + public Class getSpecClass() { + return FlinkDeploymentSpec.class; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkDeploymentStatus.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkDeploymentStatus.java new file mode 100644 index 0000000000..0887e24dec --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkDeploymentStatus.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.FlinkDeploymentSpec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.HashMap; +import java.util.Map; + +/** Last observed status of the Flink deployment. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class FlinkDeploymentStatus extends CommonStatus { + + /** Information from running clusters. */ + private Map clusterInfo = new HashMap<>(); + + /** Last observed status of the JobManager deployment. */ + private JobManagerDeploymentStatus jobManagerDeploymentStatus = + JobManagerDeploymentStatus.MISSING; + + /** Status of the last reconcile operation. */ + private FlinkDeploymentReconciliationStatus reconciliationStatus = + new FlinkDeploymentReconciliationStatus(); + + /** Information about the TaskManagers for the scale subresource. */ + private TaskManagerInfo taskManager; + + public Map getClusterInfo() { + return clusterInfo; + } + + public void setClusterInfo(Map clusterInfo) { + this.clusterInfo = clusterInfo; + } + + public JobManagerDeploymentStatus getJobManagerDeploymentStatus() { + return jobManagerDeploymentStatus; + } + + public void setJobManagerDeploymentStatus(JobManagerDeploymentStatus jobManagerDeploymentStatus) { + this.jobManagerDeploymentStatus = jobManagerDeploymentStatus; + } + + @Override + public FlinkDeploymentReconciliationStatus getReconciliationStatus() { + return reconciliationStatus; + } + + public void setReconciliationStatus(FlinkDeploymentReconciliationStatus reconciliationStatus) { + this.reconciliationStatus = reconciliationStatus; + } + + public TaskManagerInfo getTaskManager() { + return taskManager; + } + + public void setTaskManager(TaskManagerInfo taskManager) { + this.taskManager = taskManager; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkJobStatus.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkJobStatus.java new file mode 100644 index 0000000000..596b7ad97a --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkJobStatus.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +/** Possible states of a job once it has been accepted by the dispatcher. */ +public enum FlinkJobStatus { + /** + * The job has been received by the Dispatcher, and is waiting for the job manager to be created. + */ + INITIALIZING(TerminalState.NON_TERMINAL), + + /** Job is newly created, no task has started to run. */ + CREATED(TerminalState.NON_TERMINAL), + + /** Some tasks are scheduled or running, some may be pending, some may be finished. */ + RUNNING(TerminalState.NON_TERMINAL), + + /** The job has failed and is currently waiting for the cleanup to complete. */ + FAILING(TerminalState.NON_TERMINAL), + + /** The job has failed with a non-recoverable task failure. */ + FAILED(TerminalState.GLOBALLY), + + /** Job is being cancelled. */ + CANCELLING(TerminalState.NON_TERMINAL), + + /** Job has been cancelled. */ + CANCELED(TerminalState.GLOBALLY), + + /** All of the job's tasks have successfully finished. */ + FINISHED(TerminalState.GLOBALLY), + + /** The job is currently undergoing a reset and total restart. */ + RESTARTING(TerminalState.NON_TERMINAL), + + /** + * The job has been suspended which means that it has been stopped but not been removed from a + * potential HA job store. + */ + SUSPENDED(TerminalState.LOCALLY), + + /** The job is currently reconciling and waits for task execution report to recover state. */ + RECONCILING(TerminalState.NON_TERMINAL); + + // -------------------------------------------------------------------------------------------- + + private enum TerminalState { + NON_TERMINAL, + LOCALLY, + GLOBALLY + } + + private final TerminalState terminalState; + + FlinkJobStatus(TerminalState terminalState) { + this.terminalState = terminalState; + } + + /** + * Checks whether this state is globally terminal. A globally terminal job is complete and + * cannot fail any more and will not be restarted or recovered by another standby master node. + * + *

When a globally terminal state has been reached, all recovery data for the job is dropped + * from the high-availability services. + * + * @return True, if this job status is globally terminal, false otherwise. + */ + public boolean isGloballyTerminalState() { + return terminalState == TerminalState.GLOBALLY; + } + + /** + * Checks whether this state is locally terminal. Locally terminal refers to the state of a + * job's execution graph within an executing JobManager. If the execution graph is locally + * terminal, the JobManager will not continue executing or recovering the job. + * + *

The only state that is locally terminal, but not globally terminal is {@link #SUSPENDED}, + * which is typically entered when the executing JobManager looses its leader status. + * + * @return True, if this job status is terminal, false otherwise. + */ + public boolean isTerminalState() { + return terminalState != TerminalState.NON_TERMINAL; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkSessionJobReconciliationStatus.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkSessionJobReconciliationStatus.java new file mode 100644 index 0000000000..dc724f9ce0 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkSessionJobReconciliationStatus.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.FlinkSessionJobSpec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class FlinkSessionJobReconciliationStatus extends ReconciliationStatus { + + @Override + public Class getSpecClass() { + return FlinkSessionJobSpec.class; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkSessionJobStatus.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkSessionJobStatus.java new file mode 100644 index 0000000000..28db68252f --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/FlinkSessionJobStatus.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.FlinkSessionJobSpec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +/** Last observed status of the Flink Session job. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class FlinkSessionJobStatus extends CommonStatus { + + /** Status of the last reconcile operation. */ + private FlinkSessionJobReconciliationStatus reconciliationStatus = + new FlinkSessionJobReconciliationStatus(); + + @Override + public FlinkSessionJobReconciliationStatus getReconciliationStatus() { + return reconciliationStatus; + } + + public void setReconciliationStatus(FlinkSessionJobReconciliationStatus reconciliationStatus) { + this.reconciliationStatus = reconciliationStatus; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/JobManagerDeploymentStatus.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/JobManagerDeploymentStatus.java new file mode 100644 index 0000000000..03bd59daf8 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/JobManagerDeploymentStatus.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +/** Status of the Flink JobManager Kubernetes deployment. */ +public enum JobManagerDeploymentStatus { + + /** JobManager is running and ready to receive REST API calls. */ + READY, + + /** JobManager is running but not ready yet to receive REST API calls. */ + DEPLOYED_NOT_READY, + + /** JobManager process is starting up. */ + DEPLOYING, + + /** JobManager deployment not found, probably not started or killed by user. */ + // TODO: currently a mix of SUSPENDED and ERROR, needs cleanup + MISSING, + + /** Deployment in terminal error, requires spec change for reconciliation to continue. */ + ERROR; +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/JobStatus.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/JobStatus.java new file mode 100644 index 0000000000..27fd3a334e --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/JobStatus.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.PrinterColumn; + +/** Last observed status of the Flink job within an application deployment. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class JobStatus { + /** Name of the job. */ + private String jobName; + + /** Flink JobId of the Job. */ + private String jobId; + + /** Last observed state of the job. */ + @PrinterColumn(name = "Job Status") + private String state; + + /** Start time of the job. */ + private String startTime; + + /** Update time of the job. */ + private String updateTime; + + /** Information about pending and last savepoint for the job. */ + private SavepointInfo savepointInfo = new SavepointInfo(); + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public String getStartTime() { + return startTime; + } + + public void setStartTime(String startTime) { + this.startTime = startTime; + } + + public String getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(String updateTime) { + this.updateTime = updateTime; + } + + public SavepointInfo getSavepointInfo() { + return savepointInfo; + } + + public void setSavepointInfo(SavepointInfo savepointInfo) { + this.savepointInfo = savepointInfo; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/ReconciliationState.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/ReconciliationState.java new file mode 100644 index 0000000000..211c1af94f --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/ReconciliationState.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +/** Current state of the reconciliation. */ +public enum ReconciliationState { + /** The lastReconciledSpec is currently deployed. */ + DEPLOYED, + /** The spec is being upgraded. */ + UPGRADING, + /** In the process of rolling back to the lastStableSpec. */ + ROLLING_BACK, + /** Rolled back to the lastStableSpec. */ + ROLLED_BACK +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/ReconciliationStatus.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/ReconciliationStatus.java new file mode 100644 index 0000000000..7460372009 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/ReconciliationStatus.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.AbstractFlinkResource; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.AbstractFlinkSpec; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.utils.SpecUtils; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.utils.SpecWithMeta; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnore; + +/** Status of the last reconcile step for the FlinkDeployment/FlinkSessionJob. */ +public abstract class ReconciliationStatus { + + /** Epoch timestamp of the last successful reconcile operation. */ + private long reconciliationTimestamp; + + /** + * Last reconciled deployment spec. Used to decide whether further reconciliation steps are + * necessary. + */ + private String lastReconciledSpec; + + /** + * Last stable deployment spec according to the specified stability condition. If a rollback + * strategy is defined this will be the target to roll back to. + */ + private String lastStableSpec; + + public long getReconciliationTimestamp() { + return reconciliationTimestamp; + } + + public void setReconciliationTimestamp(long reconciliationTimestamp) { + this.reconciliationTimestamp = reconciliationTimestamp; + } + + public String getLastReconciledSpec() { + return lastReconciledSpec; + } + + public void setLastReconciledSpec(String lastReconciledSpec) { + this.lastReconciledSpec = lastReconciledSpec; + } + + public String getLastStableSpec() { + return lastStableSpec; + } + + public void setLastStableSpec(String lastStableSpec) { + this.lastStableSpec = lastStableSpec; + } + + public ReconciliationState getState() { + return state; + } + + public void setState(ReconciliationState state) { + this.state = state; + } + + /** Deployment state of the last reconciled spec. */ + private ReconciliationState state = ReconciliationState.UPGRADING; + + @JsonIgnore + public abstract Class getSpecClass(); + + @JsonIgnore + public SPEC deserializeLastReconciledSpec() { + // var specWithMeta = deserializeLastReconciledSpecWithMeta(); + SpecWithMeta specWithMeta = deserializeLastReconciledSpecWithMeta(); + return specWithMeta != null ? specWithMeta.getSpec() : null; + } + + @JsonIgnore + public SPEC deserializeLastStableSpec() { + // var specWithMeta = deserializeLastStableSpecWithMeta(); + SpecWithMeta specWithMeta = deserializeLastStableSpecWithMeta(); + return specWithMeta != null ? specWithMeta.getSpec() : null; + } + + @JsonIgnore + public SpecWithMeta deserializeLastReconciledSpecWithMeta() { + return SpecUtils.deserializeSpecWithMeta(lastReconciledSpec, getSpecClass()); + } + + @JsonIgnore + public SpecWithMeta deserializeLastStableSpecWithMeta() { + return SpecUtils.deserializeSpecWithMeta(lastStableSpec, getSpecClass()); + } + + @JsonIgnore + public void serializeAndSetLastReconciledSpec( + SPEC spec, AbstractFlinkResource resource) { + setLastReconciledSpec(SpecUtils.writeSpecWithMeta(spec, resource)); + } + + public void markReconciledSpecAsStable() { + lastStableSpec = lastReconciledSpec; + } + + @JsonIgnore + public boolean isLastReconciledSpecStable() { + if (lastReconciledSpec == null || lastStableSpec == null) { + return false; + } + return lastReconciledSpec.equals(lastStableSpec); + } + + @JsonIgnore + public boolean isBeforeFirstDeployment() { + return lastReconciledSpec == null; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/Savepoint.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/Savepoint.java new file mode 100644 index 0000000000..7152f19c53 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/Savepoint.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +/** Represents information about a finished savepoint. */ +public class Savepoint { + /** Millisecond timestamp at the start of the savepoint operation. */ + private long timeStamp; + + /** External pointer of the savepoint can be used to recover jobs. */ + private String location; + + /** Savepoint trigger mechanism. */ + private SavepointTriggerType triggerType = SavepointTriggerType.UNKNOWN; + + /** Savepoint format. */ + private SavepointFormatType formatType = SavepointFormatType.UNKNOWN; + + /** + * Nonce value used when the savepoint was triggered manually {@link SavepointTriggerType#MANUAL}, + * null for other types of savepoints. + */ + private Long triggerNonce; + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public SavepointTriggerType getTriggerType() { + return triggerType; + } + + public void setTriggerType(SavepointTriggerType triggerType) { + this.triggerType = triggerType; + } + + public SavepointFormatType getFormatType() { + return formatType; + } + + public void setFormatType(SavepointFormatType formatType) { + this.formatType = formatType; + } + + public Long getTriggerNonce() { + return triggerNonce; + } + + public void setTriggerNonce(Long triggerNonce) { + this.triggerNonce = triggerNonce; + } + + public Savepoint( + long timeStamp, + String location, + SavepointTriggerType triggerType, + SavepointFormatType formatType, + Long triggerNonce) { + this.timeStamp = timeStamp; + this.location = location; + if (triggerType != null) { + this.triggerType = triggerType; + } + if (formatType != null) { + this.formatType = formatType; + } + this.triggerNonce = triggerNonce; + } + + public static Savepoint of(String location, long timeStamp, SavepointTriggerType triggerType) { + return new Savepoint(timeStamp, location, triggerType, SavepointFormatType.UNKNOWN, null); + } + + public static Savepoint of(String location, SavepointTriggerType triggerType) { + return new Savepoint( + System.currentTimeMillis(), location, triggerType, SavepointFormatType.UNKNOWN, null); + } + + public static Savepoint of( + String location, SavepointTriggerType triggerType, SavepointFormatType formatType) { + return new Savepoint(System.currentTimeMillis(), location, triggerType, formatType, null); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/SavepointFormatType.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/SavepointFormatType.java new file mode 100644 index 0000000000..ad2f80729c --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/SavepointFormatType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +/** Savepoint format type. */ +public enum SavepointFormatType { + + /** A canonical, common for all state backends format. */ + CANONICAL, + /** A format specific for the chosen state backend. */ + NATIVE, + /** Savepoint format unknown, if the savepoint was not triggered by the operator. */ + UNKNOWN +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/SavepointInfo.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/SavepointInfo.java new file mode 100644 index 0000000000..9fdff38fa0 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/SavepointInfo.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +import java.util.ArrayList; +import java.util.List; + +/** Stores savepoint related information. */ +public class SavepointInfo { + /** Last completed savepoint by the operator. */ + private Savepoint lastSavepoint; + + /** Trigger id of a pending savepoint operation. */ + private String triggerId; + + /** Trigger timestamp of a pending savepoint operation. */ + private Long triggerTimestamp; + + /** Savepoint trigger mechanism. */ + private SavepointTriggerType triggerType; + + /** Savepoint format. */ + private SavepointFormatType formatType; + + /** List of recent savepoints. */ + private List savepointHistory = new ArrayList<>(); + + /** Trigger timestamp of last periodic savepoint operation. */ + private long lastPeriodicSavepointTimestamp = 0L; + + public Savepoint getLastSavepoint() { + return lastSavepoint; + } + + public void setLastSavepoint(Savepoint lastSavepoint) { + this.lastSavepoint = lastSavepoint; + } + + public String getTriggerId() { + return triggerId; + } + + public void setTriggerId(String triggerId) { + this.triggerId = triggerId; + } + + public Long getTriggerTimestamp() { + return triggerTimestamp; + } + + public void setTriggerTimestamp(Long triggerTimestamp) { + this.triggerTimestamp = triggerTimestamp; + } + + public SavepointTriggerType getTriggerType() { + return triggerType; + } + + public void setTriggerType(SavepointTriggerType triggerType) { + this.triggerType = triggerType; + } + + public SavepointFormatType getFormatType() { + return formatType; + } + + public void setFormatType(SavepointFormatType formatType) { + this.formatType = formatType; + } + + public List getSavepointHistory() { + return savepointHistory; + } + + public void setSavepointHistory(List savepointHistory) { + this.savepointHistory = savepointHistory; + } + + public long getLastPeriodicSavepointTimestamp() { + return lastPeriodicSavepointTimestamp; + } + + public void setLastPeriodicSavepointTimestamp(long lastPeriodicSavepointTimestamp) { + this.lastPeriodicSavepointTimestamp = lastPeriodicSavepointTimestamp; + } + + public void setTrigger( + String triggerId, SavepointTriggerType triggerType, SavepointFormatType formatType) { + this.triggerId = triggerId; + this.triggerTimestamp = System.currentTimeMillis(); + this.triggerType = triggerType; + this.formatType = formatType; + } + + public void resetTrigger() { + this.triggerId = null; + this.triggerTimestamp = null; + this.triggerType = null; + this.formatType = null; + } + + /** + * Update last savepoint info and add the savepoint to the history if it isn't already the most + * recent savepoint. + * + * @param savepoint Savepoint to be added. + */ + public void updateLastSavepoint(Savepoint savepoint) { + if (lastSavepoint == null || !lastSavepoint.getLocation().equals(savepoint.getLocation())) { + lastSavepoint = savepoint; + savepointHistory.add(savepoint); + if (savepoint.getTriggerType() == SavepointTriggerType.PERIODIC) { + lastPeriodicSavepointTimestamp = savepoint.getTimeStamp(); + } + } + resetTrigger(); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/SavepointTriggerType.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/SavepointTriggerType.java new file mode 100644 index 0000000000..f148fe9d74 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/SavepointTriggerType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +/** Savepoint trigger mechanism. */ +public enum SavepointTriggerType { + /** Savepoint manually triggered by changing the savepointTriggerNonce. */ + MANUAL, + /** Savepoint periodically triggered by the operator. */ + PERIODIC, + /** Savepoint triggered during stateful upgrade. */ + UPGRADE, + /** Savepoint trigger mechanism unknown, such as savepoint retrieved directly from Flink job. */ + UNKNOWN +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/TaskManagerInfo.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/TaskManagerInfo.java new file mode 100644 index 0000000000..2566bcf93a --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/status/TaskManagerInfo.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.status; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.LabelSelector; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.model.annotation.StatusReplicas; + +/** Last observed status of the Flink job within an application deployment. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class TaskManagerInfo { + /** TaskManager label selector. */ + @LabelSelector private String labelSelector; + + /** Number of TaskManager replicas if defined in the spec. */ + @StatusReplicas private int replicas = 0; +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/utils/SpecUtils.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/utils/SpecUtils.java new file mode 100644 index 0000000000..28e7fb7ec6 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/utils/SpecUtils.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.utils; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.AbstractFlinkResource; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.reconciler.ReconciliationMetadata; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.AbstractFlinkSpec; + +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.node.ObjectNode; + +/** Spec utilities. */ +public class SpecUtils { + public static final String INTERNAL_METADATA_JSON_KEY = "resource_metadata"; + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Deserializes the spec and custom metadata object from JSON. + * + * @param specWithMetaString JSON string. + * @param specClass Spec class for deserialization. + * @param Spec type. + * @return SpecWithMeta of spec and meta. + */ + public static SpecWithMeta deserializeSpecWithMeta( + String specWithMetaString, Class specClass) { + if (specWithMetaString == null) { + return null; + } + + try { + ObjectNode wrapper = (ObjectNode) objectMapper.readTree(specWithMetaString); + ObjectNode internalMeta = (ObjectNode) wrapper.remove(INTERNAL_METADATA_JSON_KEY); + + if (internalMeta == null) { + // migrating from old format + wrapper.remove("apiVersion"); + return new SpecWithMeta<>(objectMapper.treeToValue(wrapper, specClass), null); + } else { + return new SpecWithMeta<>( + objectMapper.treeToValue(wrapper.get("spec"), specClass), + objectMapper.convertValue(internalMeta, ReconciliationMetadata.class)); + } + } catch (JsonProcessingException e) { + throw new RuntimeException("Could not deserialize spec, this indicates a bug...", e); + } + } + + /** + * Serializes the spec and custom meta information into a JSON string. + * + * @param spec Flink resource spec. + * @param relatedResource Related Flink resource for creating the meta object. + * @return Serialized json. + */ + public static String writeSpecWithMeta( + AbstractFlinkSpec spec, AbstractFlinkResource relatedResource) { + return writeSpecWithMeta(spec, ReconciliationMetadata.from(relatedResource)); + } + + /** + * Serializes the spec and custom meta information into a JSON string. + * + * @param spec Flink resource spec. + * @param metadata Reconciliation meta object. + * @return Serialized json. + */ + public static String writeSpecWithMeta(AbstractFlinkSpec spec, ReconciliationMetadata metadata) { + + ObjectNode wrapper = objectMapper.createObjectNode(); + + wrapper.set("spec", objectMapper.valueToTree(checkNotNull(spec))); + wrapper.set(INTERNAL_METADATA_JSON_KEY, objectMapper.valueToTree(checkNotNull(metadata))); + + try { + return objectMapper.writeValueAsString(wrapper); + } catch (JsonProcessingException e) { + throw new RuntimeException("Could not serialize spec, this indicates a bug...", e); + } + } + + // We do not have access to Flink's Preconditions from here + private static T checkNotNull(T object) { + if (object == null) { + throw new NullPointerException(); + } else { + return object; + } + } + + public static T clone(T object) { + if (object == null) { + return null; + } + try { + return (T) objectMapper.readValue(objectMapper.writeValueAsString(object), object.getClass()); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/utils/SpecWithMeta.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/utils/SpecWithMeta.java new file mode 100644 index 0000000000..4321d9c6f4 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/crds/utils/SpecWithMeta.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.crds.utils; + +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.reconciler.ReconciliationMetadata; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.AbstractFlinkSpec; + +/** + * Utility class for encapsulating Kubernetes resource spec and meta fields during serialization. + */ +public class SpecWithMeta { + T spec; + ReconciliationMetadata meta; + + public SpecWithMeta(T spec, ReconciliationMetadata meta) { + this.spec = spec; + this.meta = meta; + } + + public T getSpec() { + return spec; + } + + public void setSpec(T spec) { + this.spec = spec; + } + + public ReconciliationMetadata getMeta() { + return meta; + } + + public void setMeta(ReconciliationMetadata meta) { + this.meta = meta; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/deployment/FlinkDeploymentOperatorClusterDescriptorAdapter.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/deployment/FlinkDeploymentOperatorClusterDescriptorAdapter.java new file mode 100644 index 0000000000..cfd628ad1c --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/deployment/FlinkDeploymentOperatorClusterDescriptorAdapter.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.deployment; + +import org.apache.linkis.engineconnplugin.flink.client.shims.config.FlinkKubernetesOperatorConfig; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.FlinkDeployment; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.FlinkDeploymentList; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.spec.*; +import org.apache.linkis.engineconnplugin.flink.client.shims.crds.status.FlinkDeploymentStatus; +import org.apache.linkis.engineconnplugin.flink.client.shims.util.FlinkKubernetesHelper; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ObjectMeta; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.CustomResource; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Watcher; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.WatcherException; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.MixedOperation; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; + +import java.io.Closeable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkDeploymentOperatorClusterDescriptorAdapter implements Closeable { + private static final Logger logger = + LoggerFactory.getLogger(FlinkDeploymentOperatorClusterDescriptorAdapter.class); + + private FlinkKubernetesOperatorConfig config; + + private KubernetesClient client; + + private JobID jobId; + + private JobStatus jobStatus; + + public FlinkDeploymentOperatorClusterDescriptorAdapter(FlinkKubernetesOperatorConfig config) { + this.config = config; + this.client = + FlinkKubernetesHelper.getKubernetesClientByKubeConfigFile(config.getK8sConfigFile()); + } + + public void deployCluster(String[] programArguments, String applicationClassName) { + logger.info("The flink k8s operator task start,k8sNamespace: {}", config.getK8sNamespace()); + CustomResourceDefinitionList crds = + client.apiextensions().v1().customResourceDefinitions().list(); + + String FlinkDeploymentCRDName = CustomResource.getCRDName(FlinkDeployment.class); + List flinkCRDList = + crds.getItems().stream() + .filter(crd -> crd.getMetadata().getName().equals(FlinkDeploymentCRDName)) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(flinkCRDList)) { + throw new RuntimeException("The flink operator crd does not exist"); + } + + NonNamespaceOperation< + FlinkDeployment, + FlinkDeploymentList, + org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.Resource< + FlinkDeployment>> + flinkDeploymentClient = getFlinkDeploymentClient(); + + FlinkDeployment flinkDeployment = + getFlinkDeployment(config.getFlinkAppName(), config.getK8sNamespace()); + + FlinkDeploymentSpec flinkDeploymentSpec = new FlinkDeploymentSpec(); + flinkDeploymentSpec.setFlinkVersion(FlinkVersion.v1_16); + flinkDeploymentSpec.setImage(config.getK8sImage()); + flinkDeploymentSpec.setFlinkConfiguration(config.getFlinkConfiguration()); + flinkDeployment.setSpec(flinkDeploymentSpec); + flinkDeploymentSpec.setServiceAccount(config.getK8sServiceAccount()); + JobManagerSpec jobManagerSpec = new JobManagerSpec(); + jobManagerSpec.setResource( + new Resource( + Double.valueOf(config.getJobmanagerCpu()), + config.getJobmanagerMemory(), + config.getJobmanagerMemory())); + flinkDeploymentSpec.setJobManager(jobManagerSpec); + TaskManagerSpec taskManagerSpec = new TaskManagerSpec(); + taskManagerSpec.setResource( + new Resource( + Double.valueOf(config.getTaskmanagerCpu()), + config.getTaskmanagerMemory(), + config.getTaskmanagerMemory())); + flinkDeploymentSpec.setTaskManager(taskManagerSpec); + flinkDeployment + .getSpec() + .setJob( + JobSpec.Builder() + .jarURI(config.getJar()) + .parallelism(config.getParallelism()) + .upgradeMode(UpgradeMode.STATELESS) + .build()); + + logger.info("Flink k8s operator task parameters: {}", flinkDeploymentSpec); + flinkDeployment.setSpec(flinkDeploymentSpec); + + FlinkDeployment created = flinkDeploymentClient.createOrReplace(flinkDeployment); + logger.info("Preparing to submit the Flink k8s operator Task: {}", created); + + // Wait three seconds to get the status + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + + } + + FlinkDeploymentList list = getFlinkDeploymentClient().list(); + + List flinkDeployments = + list.getItems().stream() + .filter(crd -> crd.getMetadata().getName().equals(config.getFlinkAppName())) + .collect(Collectors.toList()); + + if (CollectionUtils.isNotEmpty(flinkDeployments)) { + FlinkDeploymentStatus status = flinkDeployments.get(0).getStatus(); + if (Objects.nonNull(status)) { + jobId = JobID.fromHexString(status.getJobStatus().getJobId()); + jobStatus = JobStatus.valueOf(status.getJobStatus().getState()); + logger.info( + "Flink k8s operator task: {},status: {}", + config.getFlinkAppName(), + jobStatus.toString()); + } + } + } + + public void startFlinkKubernetesOperatorWatcher() { + getFlinkDeploymentClient() + .inNamespace(this.config.getK8sNamespace()) + .withName(this.config.getFlinkAppName()) + .watch( + new Watcher() { + @Override + public void eventReceived(Action action, FlinkDeployment FlinkDeployment) { + if (Objects.nonNull(FlinkDeployment.getStatus())) { + jobStatus = + JobStatus.valueOf(FlinkDeployment.getStatus().getJobStatus().getState()); + jobId = + JobID.fromHexString(FlinkDeployment.getStatus().getJobStatus().getJobId()); + logger.info( + "Flink kubernetes operator task name:{},jobId:{},state:{}", + config.getFlinkAppName(), + jobId, + jobStatus); + } + } + + @Override + public void onClose(WatcherException e) { + // Invoked when the watcher closes due to an Exception.Restart Watcher. + logger.error("Use k8s watch obtain the status failed", e); + startFlinkKubernetesOperatorWatcher(); + } + }); + } + + @Override + public void close() { + logger.info("Start to close job {}.", config.getFlinkAppName()); + FlinkDeployment FlinkDeployment = + getFlinkDeployment(config.getFlinkAppName(), config.getK8sNamespace()); + getFlinkDeploymentClient().delete(FlinkDeployment); + client.close(); + } + + public JobID getJobId() { + return jobId; + } + + public JobStatus getJobStatus() { + return jobStatus; + } + + public MixedOperation< + FlinkDeployment, + FlinkDeploymentList, + org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.Resource< + FlinkDeployment>> + getFlinkDeploymentClient() { + return client.customResources(FlinkDeployment.class, FlinkDeploymentList.class); + } + + public FlinkDeployment getFlinkDeployment(String name, String namespace) { + FlinkDeployment FlinkDeployment = new FlinkDeployment(); + ObjectMeta metadata = new ObjectMeta(); + metadata.setName(name); + metadata.setNamespace(namespace); + FlinkDeployment.setMetadata(metadata); + return FlinkDeployment; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/util/FlinkKubernetesHelper.java b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/util/FlinkKubernetesHelper.java new file mode 100644 index 0000000000..440302216b --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims-1.16.2/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/util/FlinkKubernetesHelper.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.util; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.*; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkKubernetesHelper { + private static final Logger logger = LoggerFactory.getLogger(FlinkKubernetesHelper.class); + + public static KubernetesClient getKubernetesClientByKubeConfigFile(String kubeConfigFile) { + final Config config; + + if (kubeConfigFile != null) { + try { + config = + Config.fromKubeconfig( + null, + FileUtils.readFileToString(new File(kubeConfigFile), StandardCharsets.UTF_8), + null); + } catch (IOException e) { + throw new KubernetesClientException("Load kubernetes config failed.", e); + } + } else { + config = Config.autoConfigure(null); + } + + DefaultKubernetesClient kubernetesClient = new DefaultKubernetesClient(config); + logger.info( + "KubernetesClient Create success,kubernetesClient masterUrl: {}", + kubernetesClient.getMasterUrl().toString()); + return kubernetesClient; + } +} diff --git a/linkis-engineconn-plugins/flink/flink-shims/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/FlinkShims.java b/linkis-engineconn-plugins/flink/flink-shims/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/FlinkShims.java index de9794e281..2ac72d5bce 100644 --- a/linkis-engineconn-plugins/flink/flink-shims/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/FlinkShims.java +++ b/linkis-engineconn-plugins/flink/flink-shims/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/FlinkShims.java @@ -17,17 +17,26 @@ package org.apache.linkis.engineconnplugin.flink.client.shims; +import org.apache.linkis.engineconnplugin.flink.client.shims.config.FlinkKubernetesOperatorConfig; import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; + +import java.io.Closeable; import java.lang.reflect.Constructor; import java.util.concurrent.CompletableFuture; -public abstract class FlinkShims { +public abstract class FlinkShims implements Closeable { private static FlinkShims flinkShims; protected String flinkVersion; + private JobID jobId; + + private JobStatus jobStatus; + public FlinkShims(String flinkVersion) { this.flinkVersion = flinkVersion; } @@ -78,4 +87,20 @@ public abstract CompletableFuture cancelWithSavepoint( public abstract CompletableFuture stopWithSavepoint( Object clusterClient, Object jobId, boolean advanceToEndOfEventTime, String savepoint); + + public abstract void deployKubernetesOperator( + String[] programArguments, String applicationClassName, FlinkKubernetesOperatorConfig config); + + public JobID getJobId() { + return jobId; + } + + public JobStatus getJobStatus() { + return jobStatus; + } + + public void startFlinkKubernetesOperatorWatcher() {} + + @Override + public void close() {} } diff --git a/linkis-engineconn-plugins/flink/flink-shims/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/config/FlinkKubernetesOperatorConfig.java b/linkis-engineconn-plugins/flink/flink-shims/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/config/FlinkKubernetesOperatorConfig.java new file mode 100644 index 0000000000..f0ed684121 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-shims/src/main/java/org/apache/linkis/engineconnplugin/flink/client/shims/config/FlinkKubernetesOperatorConfig.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.shims.config; + +import java.util.Map; + +public class FlinkKubernetesOperatorConfig { + private String k8sConfigFile; + + private String k8sNamespace; + + private String k8sImage; + + private String k8sServiceAccount; + + private String jar; + + private String flinkAppName; + + private String jobmanagerMemory; + + private String taskmanagerMemory; + + private String jobmanagerCpu; + + private String taskmanagerCpu; + + private Integer parallelism; + + private Map flinkConfiguration; + + public String getFlinkAppName() { + return flinkAppName; + } + + public String getJobmanagerMemory() { + return jobmanagerMemory; + } + + public String getTaskmanagerMemory() { + return taskmanagerMemory; + } + + public String getJobmanagerCpu() { + return jobmanagerCpu; + } + + public String getTaskmanagerCpu() { + return taskmanagerCpu; + } + + public Map getFlinkConfiguration() { + return flinkConfiguration; + } + + public String getK8sConfigFile() { + return k8sConfigFile; + } + + public String getK8sNamespace() { + return k8sNamespace; + } + + public String getK8sImage() { + return k8sImage; + } + + public String getK8sServiceAccount() { + return k8sServiceAccount; + } + + public String getJar() { + return jar; + } + + public Integer getParallelism() { + return parallelism; + } + + public static FlinkKubernetesOperatorConfigBuilder Builder() { + return new FlinkKubernetesOperatorConfigBuilder(); + } + + public static class FlinkKubernetesOperatorConfigBuilder { + private String k8sConfigFile; + private String k8sNamespace; + private String k8sImage; + private String k8sServiceAccount; + private String jar; + private String flinkAppName; + private String jobmanagerMemory; + private String taskmanagerMemory; + private String jobmanagerCpu; + private String taskmanagerCpu; + private Integer parallelism; + private Map flinkConfiguration; + + private FlinkKubernetesOperatorConfigBuilder() {} + + public static FlinkKubernetesOperatorConfigBuilder aFlinkKubernetesOperatorConfig() { + return new FlinkKubernetesOperatorConfigBuilder(); + } + + public FlinkKubernetesOperatorConfigBuilder k8sConfigFile(String k8sConfigFile) { + this.k8sConfigFile = k8sConfigFile; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder k8sNamespace(String k8sNamespace) { + this.k8sNamespace = k8sNamespace; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder k8sImage(String k8sImage) { + this.k8sImage = k8sImage; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder k8sServiceAccount(String k8sServiceAccount) { + this.k8sServiceAccount = k8sServiceAccount; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder jar(String jar) { + this.jar = jar; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder flinkAppName(String flinkAppName) { + this.flinkAppName = flinkAppName; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder jobmanagerMemory(String jobmanagerMemory) { + this.jobmanagerMemory = jobmanagerMemory; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder taskmanagerMemory(String taskmanagerMemory) { + this.taskmanagerMemory = taskmanagerMemory; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder jobmanagerCpu(String jobmanagerCpu) { + this.jobmanagerCpu = jobmanagerCpu; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder taskmanagerCpu(String taskmanagerCpu) { + this.taskmanagerCpu = taskmanagerCpu; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder parallelism(Integer parallelism) { + this.parallelism = parallelism; + return this; + } + + public FlinkKubernetesOperatorConfigBuilder flinkConfiguration( + Map flinkConfiguration) { + this.flinkConfiguration = flinkConfiguration; + return this; + } + + public FlinkKubernetesOperatorConfig build() { + FlinkKubernetesOperatorConfig flinkKubernetesOperatorConfig = + new FlinkKubernetesOperatorConfig(); + flinkKubernetesOperatorConfig.taskmanagerMemory = this.taskmanagerMemory; + flinkKubernetesOperatorConfig.jobmanagerMemory = this.jobmanagerMemory; + flinkKubernetesOperatorConfig.flinkAppName = this.flinkAppName; + flinkKubernetesOperatorConfig.parallelism = this.parallelism; + flinkKubernetesOperatorConfig.taskmanagerCpu = this.taskmanagerCpu; + flinkKubernetesOperatorConfig.k8sConfigFile = this.k8sConfigFile; + flinkKubernetesOperatorConfig.k8sImage = this.k8sImage; + flinkKubernetesOperatorConfig.k8sServiceAccount = this.k8sServiceAccount; + flinkKubernetesOperatorConfig.jar = this.jar; + flinkKubernetesOperatorConfig.jobmanagerCpu = this.jobmanagerCpu; + flinkKubernetesOperatorConfig.k8sNamespace = this.k8sNamespace; + flinkKubernetesOperatorConfig.flinkConfiguration = this.flinkConfiguration; + return flinkKubernetesOperatorConfig; + } + } +}