Skip to content

Commit

Permalink
Flink supports udf function (apache#5067)
Browse files Browse the repository at this point in the history
* Flink supports udf function

* add flink udf hook

* Code optimization
  • Loading branch information
ChengJie1053 authored Jan 12, 2024
1 parent be7b820 commit 6736fed
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
public static final long CLIENT_REQUEST_TIMEOUT =
FlinkEnvConfiguration.FLINK_CLIENT_REQUEST_TIMEOUT().getValue().toLong();

protected final ExecutionContext executionContext;
public final ExecutionContext executionContext;
// jobId is not null only after job is submitted
private JobID jobId;
protected ApplicationId clusterID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public Operation createOperation(SqlCommandCall call, FlinkEngineConnContext con
context, call.operands[0], Boolean.parseBoolean(call.operands[1]));
break;
case CREATE_TABLE:
case CREATE_FUNCTION:
case DROP_TABLE:
case ALTER_TABLE:
case CREATE_CATALOG:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ private String getExceptionMsg() {
case CREATE_TABLE:
actionMsg = "create a table";
break;
case CREATE_FUNCTION:
actionMsg = "create a function";
break;
case CREATE_DATABASE:
actionMsg = "create a database";
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public enum SqlCommand {

CREATE_DATABASE,

CREATE_FUNCTION,

ALTER_DATABASE,

DROP_DATABASE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ private Optional<SqlCommandCall> parseStmt(String stmt, boolean isBlinkPlanner)
} else if (node instanceof SqlCreateDatabase) {
cmd = SqlCommand.CREATE_DATABASE;
operands = new String[] {stmt};
} else if (node instanceof SqlCreateFunction) {
cmd = SqlCommand.CREATE_FUNCTION;
operands = new String[] {stmt};
} else if (node instanceof SqlDropDatabase) {
cmd = SqlCommand.DROP_DATABASE;
operands = new String[] {stmt};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconnplugin.flink.client.utils;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.functions.UserDefinedFunction;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkUdfUtils {

private static final Logger logger = LoggerFactory.getLogger(FlinkUdfUtils.class);

private static final String CREATE_TEMP_FUNCTION_PATTERN =
"create\\s+temporary\\s+function\\s+(\\w+)\\s+as\\s+\"(.*?)\"";

private static final String CREATE_TEMP_FUNCTION_SQL =
"CREATE TEMPORARY FUNCTION IF NOT EXISTS %s AS '%s' ";

public static void addFlinkPipelineClasspaths(StreamExecutionEnvironment env, String path) {
logger.info("Flink udf start add pipeline classpaths, jar path: {}", path);

try {
Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration conf = (Configuration) configuration.get(env);

Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map<String, Object> map = (Map<String, Object>) confData.get(conf);
List<String> jarList = new ArrayList<>();
List<String> oldList =
conf.getOptional(PipelineOptions.CLASSPATHS).orElseGet(Collections::emptyList);
if (CollectionUtils.isNotEmpty(oldList)) {
jarList.addAll(oldList);
}
jarList.add(path);
map.put(PipelineOptions.CLASSPATHS.key(), jarList);
} catch (Exception e) {
logger.warn("Flink udf add pipeline classpaths failed", e);
}
}

public static void loadJar(String jarPath) {
logger.info("Flink udf URLClassLoader start loadJar: {}", jarPath);

Method method = null;
Boolean accessible = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
accessible = method.isAccessible();

if (accessible == false) {
method.setAccessible(true);
}
URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
method.invoke(classLoader, new URL(jarPath));

} catch (Exception e) {
logger.warn("Flink udf URLClassLoader loadJar failed", e);
} finally {
if (accessible != null) {
method.setAccessible(accessible);
}
}
}

public static String extractUdfClass(String statement) {
Pattern pattern = Pattern.compile(CREATE_TEMP_FUNCTION_PATTERN);
Matcher matcher = pattern.matcher(statement);
if (matcher.find() && matcher.groupCount() >= 2) {
return matcher.group(2);
}
return "";
}

public static boolean isFlinkUdf(ClassLoader classLoader, String className) {
try {
Class<?> udfClass = classLoader.loadClass(className);
if (UserDefinedFunction.class.isAssignableFrom(udfClass)) {
return true;
}

} catch (ClassNotFoundException e) {
logger.warn("flink udf load isFlinkUdf failed, ClassNotFoundException: {}", className);
}
return false;
}

public static String generateFlinkUdfSql(String name, String className) {
return String.format(CREATE_TEMP_FUNCTION_SQL, name, className);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ wds.linkis.server.version=v1
wds.linkis.engineconn.debug.enable=true
#wds.linkis.keytab.enable=true
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineconnplugin.flink.FlinkEngineConnPlugin
wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook
wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconnplugin.flink.hook.FlinkJarUdfEngineHook
wds.linkis.engineconn.executor.manager.class=org.apache.linkis.engineconnplugin.flink.executormanager.FlinkExecutorManager
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FlinkSQLComputationExecutor(
with FlinkExecutor {

private var operation: JobOperation = _
private var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _
var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _

override def init(): Unit = {
setCodeParser(new SQLCodeParser)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconnplugin.flink.hook

import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.computation.executor.hook.UDFLoadEngineConnHook
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils
import org.apache.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, EngineTypeLabel, RunType}
import org.apache.linkis.udf.utils.ConstantVar
import org.apache.linkis.udf.vo.UDFInfoVo

import org.apache.commons.lang3.StringUtils

import scala.collection.JavaConverters.asScalaBufferConverter

class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook {
override val udfType: BigInt = ConstantVar.UDF_JAR
override val category: String = ConstantVar.UDF
override val runType = RunType.SQL

var labels: Array[Label[_]] = null

override protected def constructCode(udfInfo: UDFInfoVo): String = {
val path: String = udfInfo.getPath
val registerFormat: String = udfInfo.getRegisterFormat

if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) {
logger.warn("Flink udfInfo path or registerFormat cannot is empty")
return ""
}

val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat)
if (StringUtils.isBlank(udfClassName)) {
logger.warn("Flink extract udf class name cannot is empty")
return ""
}

FlinkUdfUtils.loadJar(path)

if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) {
logger.warn(
"There is no extends Flink UserDefinedFunction, skip loading flink udf: {} ",
path
)
return ""
}

val flinkUdfSql: String =
FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName)

logger.info(
s"Flink start load udf, udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}\n"
)

if (labels != null && labels.nonEmpty) {
val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
executor match {
case computationExecutor: FlinkSQLComputationExecutor =>
FlinkUdfUtils.addFlinkPipelineClasspaths(
computationExecutor.clusterDescriptor.executionContext.getStreamExecutionEnvironment,
path
)
case _ =>
}
}

"%sql\n" + flinkUdfSql
}

override def afterExecutionExecute(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {
val codeLanguageLabel = new CodeLanguageLabel
engineCreationContext.getLabels().asScala.find(_.isInstanceOf[EngineTypeLabel]) match {
case Some(engineTypeLabel) =>
codeLanguageLabel.setCodeType(
getRealRunType(engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType).toString
)
case None =>
codeLanguageLabel.setCodeType(runType.toString)
}
labels = Array[Label[_]](codeLanguageLabel)

super.afterExecutionExecute(engineCreationContext, engineConn)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta
addPathToClassPath,
CLASS_PATH_SEPARATOR
}
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.manager.label.entity.engine.{EngineConnMode, UserCreatorLabel}
import org.apache.linkis.manager.label.utils.LabelUtil

import java.util

import scala.collection.JavaConverters._

import com.google.common.collect.Lists

class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {

override protected def getCommands(implicit
Expand Down Expand Up @@ -136,4 +139,17 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {

override protected def ifAddHiveConfigPath: Boolean = true

override protected def getEngineConnManagerHooks(implicit
engineConnBuildRequest: EngineConnBuildRequest
): java.util.List[String] = if (isOnceMode) {
super.getEngineConnManagerHooks(engineConnBuildRequest)
} else {
Lists.newArrayList("JarUDFLoadECMHook")
}

def isOnceMode: Boolean = {
val engineConnMode = LabelUtil.getEngineConnMode(engineConnBuildRequest.labels)
EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.flink;

import org.apache.flink.table.functions.ScalarFunction;

public class LinkisFlinkUdfExample extends ScalarFunction {
public String eval(String str) {
return String.format("linkis flink udf test: %s", str);
}
}

0 comments on commit 6736fed

Please sign in to comment.