Skip to content

Commit

Permalink
Merge pull request #15 from jiaboy20/master
Browse files Browse the repository at this point in the history
merge master to dev-fide
  • Loading branch information
jiaboy20 authored Nov 30, 2023
2 parents c3ad3f5 + 2347ae8 commit 5bdd4a4
Show file tree
Hide file tree
Showing 249 changed files with 14,101 additions and 1,444 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public enum StorageErrorCode {
FS_NOT_INIT(53001, "please init first"),

INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"),
FS_OOM(53002, "OOM occurred while reading the file");
FS_OOM(53002, "OOM occurred while reading the file"),

FS_ERROR(53003, "Failed to operation fs");

StorageErrorCode(int errorCode, String message) {
this.code = errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static <K extends MetaData, V extends Record> ResultSetReader getResultSe
return new StorageResultSetReader<>(resultSet, value);
}

public static ResultSetReader getResultSetReader(String res) {
public static ResultSetReader getResultSetReader(String res) throws IOException {
ResultSetFactory rsFactory = ResultSetFactory.getInstance();
if (rsFactory.isResultSet(res)) {
ResultSet<? extends MetaData, ? extends Record> resultSet = rsFactory.getResultSet(res);
Expand All @@ -58,21 +58,12 @@ public static ResultSetReader getResultSetReader(String res) {
FsPath resPath = new FsPath(res);
ResultSet<? extends MetaData, ? extends Record> resultSet =
rsFactory.getResultSetByPath(resPath);
try {
FSFactory.getFs(resPath).init(null);
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs init failed", e);
}
ResultSetReader reader = null;
try {
reader =
ResultSetReaderFactory.getResultSetReader(
resultSet, FSFactory.getFs(resPath).read(resPath));
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs read failed", e);
}
Fs fs = FSFactory.getFs(resPath);
fs.init(null);
ResultSetReader reader =
ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath));
if (reader instanceof StorageResultSetReader) {
((StorageResultSetReader<?, ?>) reader).setFs(FSFactory.getFs(resPath));
((StorageResultSetReader<?, ?>) reader).setFs(fs);
}
return (StorageResultSetReader<?, ?>) reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,25 @@ org.apache.linkis.common.io.resultset.ResultSetWriter<K, V> getResultSetWriter(
public static Record[] getRecordByWriter(
org.apache.linkis.common.io.resultset.ResultSetWriter<? extends MetaData, ? extends Record>
writer,
long limit) {
long limit)
throws IOException {
String res = writer.toString();
return getRecordByRes(res, limit);
}

public static Record[] getRecordByRes(String res, long limit) {
public static Record[] getRecordByRes(String res, long limit) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
int count = 0;
List<Record> records = new ArrayList<>();
try {
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
} catch (IOException e) {
logger.warn("ResultSetWriter getRecordByRes failed", e);
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
return records.toArray(new Record[0]);
}

public static Record getLastRecordByRes(String res) {
public static Record getLastRecordByRes(String res) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
Record record = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.linkis.storage.*;
import org.apache.linkis.storage.conf.*;
import org.apache.linkis.storage.domain.*;
import org.apache.linkis.storage.exception.StorageErrorException;
import org.apache.linkis.storage.utils.*;

import org.apache.commons.io.IOUtils;
Expand All @@ -37,6 +38,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.linkis.storage.exception.StorageErrorCode.FS_ERROR;

public class StorageResultSetWriter<K extends MetaData, V extends Record>
extends ResultSetWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(StorageResultSetWriter.class);
Expand Down Expand Up @@ -98,8 +101,9 @@ public void createNewFile() {
fs.init(null);
FileSystemUtils.createNewFile(storePath, proxyUser, true);
outputStream = fs.write(storePath, true);
} catch (IOException e) {
logger.warn("StorageResultSetWriter createNewFile failed", e);
} catch (Exception e) {
throw new StorageErrorException(
FS_ERROR.getCode(), "StorageResultSetWriter createNewFile failed", e);
}
logger.info("Succeed to create a new file:{}", storePath);
fileCreated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ record -> {
if (emptyValue.equals(Dolphin.LINKIS_NULL)) {
return "";
} else {
return nullValue;
return emptyValue;
}
} else if (r instanceof Double) {
return StorageUtils.doubleToString((Double) r);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ public static void createNewFile(FsPath filePath, boolean createParentWhenNotExi
createNewFile(filePath, StorageUtils.getJvmUser(), createParentWhenNotExists);
}

public static void createNewFile(
FsPath filePath, String user, boolean createParentWhenNotExists) {
public static void createNewFile(FsPath filePath, String user, boolean createParentWhenNotExists)
throws Exception {
FileSystem fileSystem = (FileSystem) FSFactory.getFsByProxyUser(filePath, user);
try {
fileSystem.init(null);
createNewFileWithFileSystem(fileSystem, filePath, user, createParentWhenNotExists);
} catch (IOException e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} catch (Exception e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} finally {
IOUtils.closeQuietly(fileSystem);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,8 @@ public static byte[] mergeByteArrays(byte[] arr1, byte[] arr2) {
System.arraycopy(arr2, 0, mergedArray, arr1.length, arr2.length);
return mergedArray;
}

public static boolean isHDFSPath(FsPath fsPath) {
return HDFS.equalsIgnoreCase(fsPath.getFsType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.governance.common.constant;

public class CodeConstants {
// will auto append at end of scala code; make sure the last line is not a comment
public static String SCALA_CODE_AUTO_APPEND_CODE = "val linkisVar=123";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.linkis.governance.common.paser

import org.apache.linkis.common.utils.{CodeAndRunTypeUtils, Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.constant.CodeConstants
import org.apache.linkis.governance.common.paser.CodeType.CodeType

import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -116,7 +117,7 @@ class ScalaCodeParser extends SingleCodeParser with Logging {
if (statementBuffer.nonEmpty) codeBuffer.append(statementBuffer.mkString("\n"))
// Make sure the last line is not a comment
codeBuffer.append("\n")
codeBuffer.append("val linkisVar=123")
codeBuffer.append(CodeConstants.SCALA_CODE_AUTO_APPEND_CODE)
codeBuffer.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,15 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {
.findAvailPortByRange(GovernanceCommonConf.ENGINE_CONN_PORT_RANGE.getValue)
.toString

var springConf = Map("server.port" -> engineConnPort, "spring.profiles.active" -> "engineconn")

var springConf =
Map[String, String]("server.port" -> engineConnPort, "spring.profiles.active" -> "engineconn")
val properties =
PortUtils.readFromProperties(Configuration.getLinkisHome + "/conf/version.properties")
if (StringUtils.isNotBlank(properties.getProperty("version"))) {
springConf += ("eureka.instance.metadata-map.linkis.app.version" -> properties.getProperty(
"version"
))
}
request.creationDesc.properties.asScala.filter(_._1.startsWith("spring.")).foreach {
case (k, v) =>
springConf = springConf + (k -> v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.linkis.ecm.core.utils

import org.apache.linkis.common.utils.Utils
import org.apache.linkis.common.utils.{Logging, Utils}

import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils

import java.io.IOException
import java.io.{BufferedReader, FileReader, IOException}
import java.net.ServerSocket
import java.util.Properties

object PortUtils {
object PortUtils extends Logging {

/**
* portRange: '-' is the separator
Expand Down Expand Up @@ -62,4 +63,23 @@ object PortUtils {
Utils.tryFinally(socket.getLocalPort)(IOUtils.closeQuietly(socket))
}

def readFromProperties(propertiesFile: String): Properties = {
val properties: Properties = new Properties
var reader: BufferedReader = null;
try {
reader = new BufferedReader(new FileReader(propertiesFile))
properties.load(reader)
} catch {
case e: Exception =>
logger.warn(s"loading vsersion faild with path $propertiesFile error:$e")
} finally {
try if (reader != null) reader.close
catch {
case e: Exception =>
logger.warn(s"try to close buffered reader with error:${e.getMessage}")
}
}
properties
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ public enum EngineconnServerErrorCodeSummary implements LinkisErrorCode {
11110,
"the parameters of engineConnInstance and ticketId are both not exists.(engineConnInstance 和ticketId 的参数都不存在.)"),
LOG_IS_NOT_EXISTS(11110, "Log directory {0} does not exists.(日志目录 {0} 不存在.)"),
FAILED_TO_DOWNLOAD(911115, "failed to downLoad(下载失败)");
FAILED_TO_DOWNLOAD(911115, "failed to downLoad(下载失败)"),
FILE_IS_OVERSIZE(911116, "Download file has exceeded 100MB(下载文件已超过100M)"),
PARAMETER_NOT_NULL(911117, "Parameter {0} cannot be empty (参数 {0} 不能为空)"),
LOGTYPE_ERROR(
911118,
"logType only supports stdout, stderr, gc, yarnApp(logType仅支持stdout,stderr,gc,yarnApp)"),
NOT_PERMISSION(
911119, "You {0} have no permission to download Log in ECM {1}(用户 {0} 无权限下载 ECM {1} 日志)"),
;

/** (errorCode)错误码 */
private final int errorCode;
Expand Down
Loading

0 comments on commit 5bdd4a4

Please sign in to comment.