From bca9fe22ffa3f5bfaedfd18ed631ecf6d23e6c30 Mon Sep 17 00:00:00 2001 From: LiuGuoHua <129264181+sjgllgh@users.noreply.github.com> Date: Wed, 27 Nov 2024 16:26:34 +0800 Subject: [PATCH] Add a containerized mode to the ECM service (#5201) * Add a containerized mode to the ECM service, which allows assigning specific IPs and ports for communication with the outside world to particular engines in this mode. For instance, a Spark engine requires at least two ports: spark.driver.port and spark.driver.blockManager.port. * Supplement the SQL for database modification * remove code that may cause compilation to fail * improve the linkis_ddl.sql in configmap-init-sql.yaml * Add all parameters from SparkConf.getConf to sparkLauncher. * Add a method addAllConf to SparkConf. * formatted code * Fix the missing ticketId field in the linkis_cg_manager_service_instance within the configmap-init-sql.yaml file. * Fix the missing observe_info field in the linkis_ps_job_history_group_history within the configmap-init-sql.yaml file. * print lm log * add debug log * Add fields(mapping_host, mapping_ports) to the linkis_cg_manager_service_instance for ddl file. * remove debug action --------- Co-authored-by: peacewong --- .github/workflows/integration-test.yml | 5 +- .../linkis/common/ServiceInstance.scala | 20 +++++ .../enums/MappingPortStrategyName.java | 39 +++++++++ .../strategy/MappingPortContext.java | 32 ++++++++ .../strategy/MappingPortStrategy.java | 24 ++++++ .../strategy/StaticMappingPortStrategy.java | 79 ++++++++++++++++++ .../ecm/core/conf/ContainerizationConf.scala | 44 ++++++++++ .../core/launch/ProcessEngineConnLaunch.scala | 45 ++++++++++ .../AbstractEngineConnLaunchService.scala | 4 +- .../linkis/manager/am/vo/AMEngineNodeVo.java | 20 +++++ .../linkis/manager/am/utils/AMUtils.scala | 2 + .../entity/persistence/PersistenceNode.java | 19 +++++ .../impl/DefaultNodeManagerPersistence.java | 8 ++ .../mapper/common/NodeManagerMapper.xml | 6 ++ .../linkis/templates/configmap-init-sql.yaml | 4 + linkis-dist/package/db/linkis_ddl.sql | 2 + linkis-dist/package/db/linkis_ddl_pg.sql | 2 + .../upgrade/1.7.0_schema/mysql/linkis_ddl.sql | 4 + .../spark/client/context/SparkConfig.java | 8 ++ ...rnApplicationClusterDescriptorAdapter.java | 3 + .../SparkContainerizationEngineConnHook.java | 82 +++++++++++++++++++ .../resources/linkis-engineconn.properties | 2 +- .../spark/config/SparkConfiguration.scala | 14 ++++ .../factory/SparkEngineConnFactory.scala | 37 +++++++++ 24 files changed, 500 insertions(+), 5 deletions(-) create mode 100644 linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java create mode 100644 linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java create mode 100644 linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java create mode 100644 linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java create mode 100644 linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala create mode 100644 linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 85d3b9a23e..6e4e5097a2 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -166,7 +166,7 @@ jobs: # Show port-forward list bash ./linkis-dist/helm/scripts/remote-proxy.sh list # Check if the web service is available - curl http://127.0.0.1:8088/indexhtml + curl http://127.0.0.1:8088/ # Execute test by linkis-cli POD_NAME=`kubectl get pods -n linkis -l app.kubernetes.io/instance=linkis-demo-mg-gateway -o jsonpath='{.items[0].metadata.name}'` @@ -182,5 +182,4 @@ jobs: #kubectl exec -it -n linkis ${POD_NAME} -- bash -c " \ #sh /opt/linkis/bin/linkis-cli -engineType spark-3.2.1 -codeType sql -code 'show databases' " - - shell: bash \ No newline at end of file + shell: bash diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala index f9e4718472..08974df281 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala @@ -21,11 +21,19 @@ class ServiceInstance { private var applicationName: String = _ private var instance: String = _ private var registryTimestamp: Long = _ + private var mappingPorts: String = _ + private var mappingHost: String = _ def setApplicationName(applicationName: String): Unit = this.applicationName = applicationName def getApplicationName: String = applicationName def setInstance(instance: String): Unit = this.instance = instance def getInstance: String = instance + def setMappingPorts(mappingPorts: String): Unit = this.mappingPorts = mappingPorts + def getMappingPorts: String = mappingPorts + + def setMappingHost(mappingHost: String): Unit = this.mappingHost = mappingHost + def getMappingHost: String = mappingHost + def setRegistryTimestamp(registryTimestamp: Long): Unit = this.registryTimestamp = registryTimestamp @@ -62,6 +70,18 @@ object ServiceInstance { serviceInstance } + def apply( + applicationName: String, + instance: String, + mappingPorts: String, + mappingHost: String + ): ServiceInstance = { + val serviceInstance = apply(applicationName, instance) + serviceInstance.setMappingPorts(mappingPorts) + serviceInstance.setMappingHost(mappingHost) + serviceInstance + } + def apply(applicationName: String, instance: String, registryTimestamp: Long): ServiceInstance = { val serviceInstance = new ServiceInstance serviceInstance.setApplicationName(applicationName) diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java new file mode 100644 index 0000000000..7d43fc9ae0 --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.ecm.core.containerization.enums; + +public enum MappingPortStrategyName { + STATIC("static"); + private String name; + + MappingPortStrategyName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public static MappingPortStrategyName toEnum(String name) { + return MappingPortStrategyName.valueOf(name.toUpperCase()); + } +} diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java new file mode 100644 index 0000000000..ef31b2562d --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.ecm.core.containerization.strategy; + +import org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName; + +public class MappingPortContext { + + public static MappingPortStrategy getInstance(MappingPortStrategyName strategyName) { + switch (strategyName) { + case STATIC: + return new StaticMappingPortStrategy(); + default: + return new StaticMappingPortStrategy(); + } + } +} diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java new file mode 100644 index 0000000000..21cf2ac3a0 --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.ecm.core.containerization.strategy; + +import java.io.IOException; + +public interface MappingPortStrategy { + int availablePort() throws IOException; +} diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java new file mode 100644 index 0000000000..6ea627093f --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.ecm.core.containerization.strategy; + +import org.apache.linkis.ecm.core.conf.ContainerizationConf; + +import org.apache.commons.io.IOUtils; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +public class StaticMappingPortStrategy implements MappingPortStrategy { + + private static final AtomicInteger currentIndex = new AtomicInteger(0); + + @Override + public int availablePort() throws IOException { + return getNewPort(10); + } + + public int getNewPort(int retryNum) throws IOException { + int[] portRange = getPortRange(); + if (retryNum == 0) { + throw new IOException( + "No available port in the portRange: " + + ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE() + .getValue()); + } + moveIndex(); + int minPort = portRange[0]; + int newPort = minPort + currentIndex.get() - 1; + ServerSocket socket = null; + try { + socket = new ServerSocket(newPort); + } catch (Exception e) { + return getNewPort(--retryNum); + } finally { + IOUtils.close(socket); + } + return newPort; + } + + private synchronized void moveIndex() { + int poolSize = getPoolSize(); + currentIndex.set(currentIndex.get() % poolSize + 1); + } + + private int[] getPortRange() { + String portRange = + ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE().getValue(); + + return Arrays.stream(portRange.split("-")).mapToInt(Integer::parseInt).toArray(); + } + + private int getPoolSize() { + int[] portRange = getPortRange(); + int minPort = portRange[0]; + int maxPort = portRange[1]; + + return maxPort - minPort + 1; + } +} diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala new file mode 100644 index 0000000000..a7461210f1 --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.ecm.core.conf + +import org.apache.linkis.common.conf.CommonVars + +object ContainerizationConf { + + val ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE = + CommonVars("linkis.engine.containerization.static.port.range", "1-65535") + + val ENGINE_CONN_CONTAINERIZATION_ENABLE = + CommonVars("linkis.engine.containerization.enable", false).getValue + + val ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST = + CommonVars("linkis.engine.containerization.mapping.host", "") + + val ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS = + CommonVars("linkis.engine.containerization.mapping.ports", "") + + val ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY = + CommonVars("linkis.engine.containerization.mapping.strategy", "static") + + // 引擎类型-需要开启的端口数量 + // Engine Type - Number of Ports Required to Be Opened + val ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST = + CommonVars("linkis.engine.containerization.engine.list", "spark-2,") + +} diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala index cc79e24d4f..4dfb96b3d5 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala @@ -20,6 +20,15 @@ package org.apache.linkis.ecm.core.launch import org.apache.linkis.common.conf.{CommonVars, Configuration} import org.apache.linkis.common.exception.ErrorException import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.ecm.core.conf.ContainerizationConf.{ + ENGINE_CONN_CONTAINERIZATION_ENABLE, + ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST, + ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST, + ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS, + ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY +} +import org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName +import org.apache.linkis.ecm.core.containerization.strategy.MappingPortContext import org.apache.linkis.ecm.core.errorcode.LinkisECMErrorCodeSummary._ import org.apache.linkis.ecm.core.exception.ECMCoreException import org.apache.linkis.ecm.core.utils.PortUtils @@ -35,6 +44,7 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.{ } import org.apache.linkis.manager.engineplugin.common.launch.process.Environment._ import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants._ +import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.commons.io.FileUtils import org.apache.commons.lang3.StringUtils @@ -54,6 +64,9 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging { private var engineConnPort: String = _ + private var mappingPorts: String = "" + private var mappingHost: String = _ + protected def newProcessEngineConnCommandBuilder(): ProcessEngineCommandBuilder = new UnixProcessEngineCommandBuilder @@ -142,6 +155,10 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging { def getEngineConnPort: String = engineConnPort + def getMappingPorts: String = mappingPorts + + def getMappingHost: String = mappingHost + protected def getProcess(): Process = this.process /** @@ -166,6 +183,20 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging { .findAvailPortByRange(GovernanceCommonConf.ENGINE_CONN_PORT_RANGE.getValue) .toString + val engineType = LabelUtil.getEngineType(request.labels) + var engineMappingPortSize = getEngineMappingPortSize(engineType) + if (ENGINE_CONN_CONTAINERIZATION_ENABLE && engineMappingPortSize > 0) { + val strategyName = ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY.getValue + val mappingPortStrategy = + MappingPortContext.getInstance(MappingPortStrategyName.toEnum(strategyName)) + + while (engineMappingPortSize > 0) { + mappingPorts += mappingPortStrategy.availablePort() + "," + engineMappingPortSize = engineMappingPortSize - 1 + } + mappingHost = ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.getValue + } + var springConf = Map[String, String]("server.port" -> engineConnPort, "spring.profiles.active" -> "engineconn") val properties = @@ -188,10 +219,24 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging { engineConnConf = engineConnConf ++: request.creationDesc.properties.asScala .filterNot(_._1.startsWith("spring.")) .toMap + + engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS.key -> mappingPorts) + engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.key -> mappingHost) + arguments.addEngineConnConf(engineConnConf) EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToArgs(arguments.build()) } + def getEngineMappingPortSize(engineType: String): Int = { + val engineList = ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST.getValue + val infoList = engineList.trim + .split(",") + .map(_.split("-")) + .filter(engine => engine(0).equals(engineType)) + if (infoList.length > 0) infoList(0)(1).toInt + else 0 + } + override def kill(): Unit = { if (process != null) { process.destroy() diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala index df00ed4960..771ceede8f 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala @@ -92,7 +92,9 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w case pro: ProcessEngineConnLaunch => val serviceInstance = ServiceInstance( GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue, - ECMUtils.getInstanceByPort(pro.getEngineConnPort) + ECMUtils.getInstanceByPort(pro.getEngineConnPort), + pro.getMappingPorts, + pro.getMappingHost ) conn.setServiceInstance(serviceInstance) case _ => diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java index d7208f211a..c88f696973 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java @@ -80,6 +80,10 @@ public class AMEngineNodeVo { private String engineType; + private String mappingPorts; + + private String mappingHost; + public String getEmInstance() { return emInstance; } @@ -288,4 +292,20 @@ public String getEngineType() { public void setEngineType(String engineType) { this.engineType = engineType; } + + public String getMappingPorts() { + return mappingPorts; + } + + public void setMappingPorts(String mappingPorts) { + this.mappingPorts = mappingPorts; + } + + public String getMappingHost() { + return mappingHost; + } + + public void setMappingHost(String mappingHost) { + this.mappingHost = mappingHost; + } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala index 428e5d23e9..a2f4ad97ae 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala @@ -167,6 +167,8 @@ object AMUtils { AMEngineNodeVo.setLabels(node.getLabels) AMEngineNodeVo.setApplicationName(node.getServiceInstance.getApplicationName) AMEngineNodeVo.setInstance(node.getServiceInstance.getInstance) + AMEngineNodeVo.setMappingHost(node.getServiceInstance.getMappingHost) + AMEngineNodeVo.setMappingPorts(node.getServiceInstance.getMappingPorts) if (null != node.getEMNode) { AMEngineNodeVo.setEmInstance(node.getEMNode.getServiceInstance.getInstance) } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java index 770a2e528a..026f5c6453 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java @@ -36,6 +36,9 @@ public class PersistenceNode { private String updator; private String creator; + private String mappingPorts; + private String mappingHost; + public String getMark() { return mark; } @@ -123,4 +126,20 @@ public String getCreator() { public void setCreator(String creator) { this.creator = creator; } + + public String getMappingPorts() { + return mappingPorts; + } + + public void setMappingPorts(String mappingPorts) { + this.mappingPorts = mappingPorts; + } + + public String getMappingHost() { + return mappingHost; + } + + public void setMappingHost(String mappingHost) { + this.mappingHost = mappingHost; + } } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java index 31a8742d9f..40f479f496 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java @@ -109,6 +109,10 @@ public void updateEngineNode(ServiceInstance serviceInstance, Node node) node.getOwner()); // The creator is not given when inserting records in rm, so you need to // set this value(rm中插入记录的时候并未给出creator,所以需要set这个值) persistenceNode.setUpdator(node.getOwner()); + String mappingPorts = node.getServiceInstance().getMappingPorts(); + persistenceNode.setMappingPorts(mappingPorts); + String mappingHost = node.getServiceInstance().getMappingHost(); + persistenceNode.setMappingHost(mappingHost); try { nodeManagerMapper.updateNodeInstance(serviceInstance.getInstance(), persistenceNode); nodeManagerMapper.updateNodeRelation( @@ -277,6 +281,8 @@ public EngineNode getEngineNode(ServiceInstance serviceInstance) ServiceInstance emServiceInstance = new ServiceInstance(); emServiceInstance.setApplicationName(emName); emServiceInstance.setInstance(emInstance); + emServiceInstance.setMappingPorts(String.valueOf(emNode.getMappingPorts())); + emServiceInstance.setMappingHost(String.valueOf(emNode.getMappingHost())); AMEMNode amemNode = new AMEMNode(); amemNode.setMark(emNode.getMark()); amemNode.setOwner(emNode.getOwner()); @@ -331,6 +337,8 @@ public List getEngineNodeByEM(ServiceInstance serviceInstance) ServiceInstance engineServiceInstance = new ServiceInstance(); engineServiceInstance.setInstance(engineNode.getInstance()); engineServiceInstance.setApplicationName(engineNode.getName()); + engineServiceInstance.setMappingPorts(String.valueOf(engineNode.getMappingPorts())); + engineServiceInstance.setMappingHost(String.valueOf(engineNode.getMappingHost())); amEngineNode.setServiceInstance(engineServiceInstance); amEngineNode.setOwner(engineNode.getOwner()); amEngineNode.setMark(engineNode.getMark()); diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml index 3d41a7b05b..3d7782c21f 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml @@ -66,6 +66,12 @@ identifier = #{persistenceNode.identifier}, + + mapping_ports = #{persistenceNode.mappingPorts}, + + + mapping_host = #{persistenceNode.mappingHost}, + WHERE instance = #{instance} diff --git a/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml b/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml index df7117efeb..634c089220 100644 --- a/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml +++ b/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml @@ -193,6 +193,7 @@ data: `engine_type` varchar(32) DEFAULT NULL COMMENT 'Engine type', `execution_code` text DEFAULT NULL COMMENT 'Job origin code or code path', `result_location` varchar(500) DEFAULT NULL COMMENT 'File path of the resultsets', + `observe_info` varchar(500) DEFAULT NULL COMMENT 'The notification information configuration of this job', PRIMARY KEY (`id`), KEY `created_time` (`created_time`), KEY `submit_user` (`submit_user`) @@ -723,6 +724,9 @@ data: `owner` varchar(32) COLLATE utf8_bin DEFAULT NULL, `mark` varchar(32) COLLATE utf8_bin DEFAULT NULL, `identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL, + `ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL, + `mapping_host` varchar(128) COLLATE utf8_bin DEFAULT NULL, + `mapping_ports` varchar(128) COLLATE utf8_bin DEFAULT NULL, `update_time` datetime DEFAULT CURRENT_TIMESTAMP, `create_time` datetime DEFAULT CURRENT_TIMESTAMP, `updator` varchar(32) COLLATE utf8_bin DEFAULT NULL, diff --git a/linkis-dist/package/db/linkis_ddl.sql b/linkis-dist/package/db/linkis_ddl.sql index 3e90023a4d..9f9d800ceb 100644 --- a/linkis-dist/package/db/linkis_ddl.sql +++ b/linkis-dist/package/db/linkis_ddl.sql @@ -751,6 +751,8 @@ CREATE TABLE `linkis_cg_manager_service_instance` ( `mark` varchar(32) COLLATE utf8_bin DEFAULT NULL, `identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL, `ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL, + `mapping_host` varchar(128) COLLATE utf8_bin DEFAULT NULL, + `mapping_ports` varchar(128) COLLATE utf8_bin DEFAULT NULL, `update_time` datetime DEFAULT CURRENT_TIMESTAMP, `create_time` datetime DEFAULT CURRENT_TIMESTAMP, `updator` varchar(32) COLLATE utf8_bin DEFAULT NULL, diff --git a/linkis-dist/package/db/linkis_ddl_pg.sql b/linkis-dist/package/db/linkis_ddl_pg.sql index 7dceba602a..5ce8fce319 100644 --- a/linkis-dist/package/db/linkis_ddl_pg.sql +++ b/linkis-dist/package/db/linkis_ddl_pg.sql @@ -804,6 +804,8 @@ CREATE TABLE linkis_cg_manager_service_instance ( mark varchar(32) NULL, identifier varchar(32) NULL, ticketId varchar(255) NULL DEFAULT NULL, + mapping_host varchar(128) NULL DEFAULT NULL, + mapping_ports varchar(128) NULL DEFAULT NULL, update_time timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP, create_time timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP, updator varchar(32) NULL, diff --git a/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql b/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql index 4ac4c8cc5c..f58e190077 100644 --- a/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql +++ b/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql @@ -95,6 +95,10 @@ CREATE TABLE `linkis_ps_python_module_info` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; +ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN mapping_ports varchar(128); +ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN mapping_host varchar(128); + + diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java index 1768b77d04..f70e557066 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java @@ -309,6 +309,14 @@ public void setConf(Map conf) { this.conf = conf; } + public void addAllConf(Map conf) { + if (this.conf == null) { + setConf(conf); + } else { + this.conf.putAll(conf); + } + } + public String getPropertiesFile() { return propertiesFile; } diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java index 9c753d862f..d65cb03c5d 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java @@ -69,6 +69,9 @@ public void deployCluster(String mainClass, String args, Map con addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal()); addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab()); addSparkArg(sparkLauncher, "--queue", sparkConfig.getQueue()); + sparkConfig + .getConf() + .forEach((key, value) -> addSparkArg(sparkLauncher, "--conf", key + "=" + value)); sparkLauncher.setAppResource(sparkConfig.getAppResource()); sparkLauncher.setMainClass(mainClass); Arrays.stream(args.split("\\s+")) diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java new file mode 100644 index 0000000000..71a0776de3 --- /dev/null +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.spark.hooks; + +import org.apache.linkis.engineconn.common.creation.EngineCreationContext; +import org.apache.linkis.engineconn.common.engineconn.EngineConn; +import org.apache.linkis.engineconn.common.hook.EngineConnHook; +import org.apache.linkis.engineplugin.spark.config.SparkConfiguration; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SparkContainerizationEngineConnHook implements EngineConnHook { + private static final Logger logger = + LoggerFactory.getLogger(SparkContainerizationEngineConnHook.class); + + @Override + public void beforeCreateEngineConn(EngineCreationContext engineCreationContext) { + Map options = engineCreationContext.getOptions(); + String mappingHost = + SparkConfiguration.ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST().getValue(options); + String mappingPorts = + SparkConfiguration.ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS().getValue(options); + List mappingPortList = + Arrays.stream(mappingPorts.trim().split(",")) + .filter(StringUtils::isNoneEmpty) + .collect(Collectors.toList()); + + if (mappingPortList.size() == 2) { + logger.info( + "加载spark容器化配置, spark.driver.host={}, spark.driver.port={}, spark.driver.blockManager.port={}", + mappingHost, + mappingPortList.get(0), + mappingPortList.get(1)); + options.put(SparkConfiguration.SPARK_DRIVER_HOST().key(), mappingHost); + options.put( + SparkConfiguration.SPARK_DRIVER_BIND_ADDRESS().key(), + SparkConfiguration.SPARK_DRIVER_BIND_ADDRESS().defaultValue()); + options.put(SparkConfiguration.SPARK_DRIVER_PORT().key(), mappingPortList.get(0)); + options.put( + SparkConfiguration.SPARK_DRIVER_BLOCK_MANAGER_PORT().key(), mappingPortList.get(1)); + } + } + + @Override + public void beforeExecutionExecute( + EngineCreationContext engineCreationContext, EngineConn engineConn) {} + + @Override + public void afterExecutionExecute( + EngineCreationContext engineCreationContext, EngineConn engineConn) {} + + @Override + public void afterEngineServerStartFailed( + EngineCreationContext engineCreationContext, Throwable throwable) {} + + @Override + public void afterEngineServerStartSuccess( + EngineCreationContext engineCreationContext, EngineConn engineConn) {} +} diff --git a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties index a535e31ea0..c05631eac5 100644 --- a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties @@ -24,7 +24,7 @@ wds.linkis.engineconn.debug.enable=true wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.spark.SparkEngineConnPlugin -wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook,org.apache.linkis.engineconn.computation.executor.hook.PythonSparkEngineHook +wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook,org.apache.linkis.engineconn.computation.executor.hook.PythonSparkEngineHook,org.apache.linkis.engineplugin.spark.hooks.SparkContainerizationEngineConnHook linkis.spark.once.yarn.restful.url=http://127.0.0.1:8088 diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index 9fea6ec70d..9aba046654 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -211,6 +211,20 @@ object SparkConfiguration extends Logging { val SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FILENAME_KEY = "spark.sparkmeasure.outputFilename" + val ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST = + CommonVars("linkis.engine.containerization.mapping.host", "") + + val ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS = + CommonVars("linkis.engine.containerization.mapping.ports", "") + + val SPARK_DRIVER_HOST = CommonVars[String]("spark.driver.host", "") + + val SPARK_DRIVER_PORT = CommonVars[String]("spark.driver.port", "") + + val SPARK_DRIVER_BIND_ADDRESS = CommonVars[String]("spark.driver.bindAddress", "0.0.0.0") + + val SPARK_DRIVER_BLOCK_MANAGER_PORT = CommonVars[String]("spark.driver.blockManager.port", "") + private def getMainJarName(): String = { val somePath = ClassUtils.jarOfClass(classOf[SparkEngineConnFactory]) if (somePath.isDefined) { diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index ad9786dff0..9e71c76531 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -137,6 +137,18 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConfig.setQueue(LINKIS_QUEUE_NAME.getValue(options)) sparkConfig.setPyFiles(SPARK_PYTHON_FILES.getValue(options)) + val conf = new util.HashMap[String, String]() + addSparkConf(conf, SPARK_DRIVER_HOST.key, SPARK_DRIVER_HOST.getValue(options)) + addSparkConf(conf, SPARK_DRIVER_PORT.key, SPARK_DRIVER_PORT.getValue(options)) + addSparkConf(conf, SPARK_DRIVER_BIND_ADDRESS.key, SPARK_DRIVER_BIND_ADDRESS.getValue(options)) + addSparkConf( + conf, + SPARK_DRIVER_BLOCK_MANAGER_PORT.key, + SPARK_DRIVER_BLOCK_MANAGER_PORT.getValue(options) + ) + + sparkConfig.addAllConf(conf) + logger.info(s"spark_info: ${sparkConfig}") sparkConfig } @@ -185,6 +197,19 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging // when spark version is greater than or equal to 1.5.0 if (master.contains("yarn")) sparkConf.set("spark.yarn.isPython", "true") + addSparkConf(sparkConf, SPARK_DRIVER_HOST.key, SPARK_DRIVER_HOST.getValue(options)) + addSparkConf(sparkConf, SPARK_DRIVER_PORT.key, SPARK_DRIVER_PORT.getValue(options)) + addSparkConf( + sparkConf, + SPARK_DRIVER_BIND_ADDRESS.key, + SPARK_DRIVER_BIND_ADDRESS.getValue(options) + ) + addSparkConf( + sparkConf, + SPARK_DRIVER_BLOCK_MANAGER_PORT.key, + SPARK_DRIVER_BLOCK_MANAGER_PORT.getValue(options) + ) + val outputDir = createOutputDir(sparkConf) logger.info( @@ -215,6 +240,18 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging SparkEngineSession(sc, sqlContext, sparkSession, outputDir) } + private def addSparkConf(conf: SparkConf, key: String, value: String): Unit = { + if (StringUtils.isNotEmpty(value)) { + conf.set(key, value) + } + } + + private def addSparkConf(conf: JMap[String, String], key: String, value: String): Unit = { + if (StringUtils.isNotEmpty(value)) { + conf.put(key, value) + } + } + def createSparkSession( outputDir: File, conf: SparkConf,