Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][PS-DataSource] DataSource supports merging into a separate service startup #4934

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ object RPCConfiguration {
"cs,contextservice,data-source-manager,metadataQuery,metadatamanager,query,jobhistory,application,configuration,filesystem,udf,variable,microservice,errorcode,bml,datasource,basedata-manager"
).getValue.split(",")

val METADATAQUERY_SERVICE_APPLICATION_NAME: CommonVars[String] =
CommonVars("wds.linkis.gateway.conf.publicservice.name", "linkis-ps-metadataquery")

val METADATAQUERY_SERVICE_LIST: Array[String] = CommonVars(
"wds.linkis.gateway.conf.metadataquery.list",
"metadatamanager,metadataquery"
).getValue.split(",")

val LINKIS_MANAGER_SERVICE_NAME: CommonVars[String] =
CommonVars("wds.linkis.gateway.conf.linkismanager.name", "linkis-cg-linkismanager")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
import org.apache.linkis.common.errorcode.LinkisErrorCode;

public enum LinkisGwAuthenticationErrorCodeSummary implements LinkisErrorCode {
TOKEN_IS_NULL(15205, "token is null(token 令牌为空)!"),
FAILED_TO_LOAD_TOKEN(15200, "Failed to load token from DB into cache(无法将 token 令牌从数据库加载到缓存中)!"),
TOKEN_VALID_OR_STALE(15201, "Token is not valid or stale(token 令牌无效或已过期)!"),
ILLEGAL_TOKENUSER(15202, "Illegal TokenUser for Token(Token非法用户)!"),
ILLEGAL_HOST(15203, "Illegal Host for Token(Token非法主机)!"),
INVALID_TOKEN(15204, "Invalid Token(令牌无效)");
FAILED_TO_LOAD_TOKEN(
15200,
"Failed to load token:{0} from DB into cache(无法将 Token:{0} 令牌从数据库加载到缓存中),Caused by:{1}"),
TOKEN_IS_EXPIRED(15201, "Token is not valid or stale({0} 令牌已过期)!"),
ILLEGAL_TOKENUSER(15202, "Illegal TokenUser for Token(Token非法用户: {0})!"),
ILLEGAL_HOST(15203, "Illegal Host for Token(非法ip: {0})!"),
INVALID_TOKEN(15204, "Invalid Token(数据库中未配置的无效令牌)"),
TOKEN_IS_NULL(15205, "token is null({0} 令牌参数为空)!"),
FAILED_TO_BAD_SQLGRAMMAR(
15206, "Failed to query token:{0} data(Token:{0} 数据查询失败), Caused by:{1}"),
NOT_EXIST_DB(15207, "Token:{0} does not exist in the table(Token:{0} 表中不存在)!, Caused by:{1}");

/** (errorCode)错误码 */
private final int errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@

<sql id="tableName">linkis_mg_gateway_auth_token</sql>

<select id="selectTokenByName" parameterType="java.lang.String" resultMap="tokenEntityMap">
SELECT * FROM
<select id="selectTokenByName" parameterType="String" resultMap="tokenEntityMap">
select * from
<include refid="tableName"/>
<where>
token_name = #{tokenName}
</where>
</select>

<select id="getAllTokens" resultMap="tokenEntityMap">
SELECT * FROM
select * from
<include refid="tableName"/>
</select>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,21 @@ import org.apache.linkis.gateway.authentication.bo.impl.TokenImpl
import org.apache.linkis.gateway.authentication.conf.TokenConfiguration
import org.apache.linkis.gateway.authentication.dao.TokenDao
import org.apache.linkis.gateway.authentication.entity.TokenEntity
import org.apache.linkis.gateway.authentication.errorcode.LinkisGwAuthenticationErrorCodeSummary
import org.apache.linkis.gateway.authentication.errorcode.LinkisGwAuthenticationErrorCodeSummary._
import org.apache.linkis.gateway.authentication.exception.{
TokenAuthException,
TokenNotExistException
}
import org.apache.linkis.gateway.authentication.exception.TokenNotExistException

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service

import java.text.MessageFormat
import java.util.concurrent.{ExecutionException, TimeUnit}

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.util.concurrent.UncheckedExecutionException

@Service
class CachedTokenService extends TokenService {
Expand All @@ -59,9 +61,9 @@ class CachedTokenService extends TokenService {

});

// def setTokenDao(tokenDao: TokenDao): Unit = {
// this.tokenDao = tokenDao
// }
// def setTokenDao(tokenDao: TokenDao): Unit = {
// this.tokenDao = tokenDao
// }

/*
TODO begin
Expand Down Expand Up @@ -104,28 +106,41 @@ class CachedTokenService extends TokenService {

private def loadTokenFromCache(tokenName: String): Token = {
if (tokenName == null) {
throw new TokenAuthException(TOKEN_IS_NULL.getErrorCode, TOKEN_IS_NULL.getErrorDesc)
throw new TokenAuthException(
TOKEN_IS_NULL.getErrorCode,
MessageFormat.format(TOKEN_IS_NULL.getErrorDesc, tokenName)
)
}
Utils.tryCatch(tokenCache.get(tokenName))(t =>
t match {
case x: ExecutionException =>
x.getCause match {
case _: TokenNotExistException => null
case _ =>
throw new TokenAuthException(
FAILED_TO_LOAD_TOKEN.getErrorCode,
FAILED_TO_LOAD_TOKEN.getErrorDesc
)
case e: TokenNotExistException =>
throwTokenAuthException(NOT_EXIST_DB, tokenName, e)
case e =>
throwTokenAuthException(FAILED_TO_LOAD_TOKEN, tokenName, e)
}
case _ =>
throw new TokenAuthException(
FAILED_TO_LOAD_TOKEN.getErrorCode,
FAILED_TO_LOAD_TOKEN.getErrorDesc
)
case e: UncheckedExecutionException =>
throwTokenAuthException(FAILED_TO_BAD_SQLGRAMMAR, tokenName, e)
case e =>
throwTokenAuthException(FAILED_TO_LOAD_TOKEN, tokenName, e)
}
)
}

private def throwTokenAuthException(
gwAuthenticationErrorCodeSummary: LinkisGwAuthenticationErrorCodeSummary,
tokenName: String,
e: Throwable
) = {
val exception = new TokenAuthException(
gwAuthenticationErrorCodeSummary.getErrorCode,
MessageFormat.format(gwAuthenticationErrorCodeSummary.getErrorDesc, tokenName, e.getMessage)
)
exception.initCause(e)
throw exception
}

private def isTokenAcceptableWithUser(token: Token, userName: String): Boolean = {
token != null && !token.isStale() && token.isUserLegal(userName)
}
Expand Down Expand Up @@ -153,20 +168,27 @@ class CachedTokenService extends TokenService {
override def doAuth(tokenName: String, userName: String, host: String): Boolean = {
val tmpToken: Token = loadTokenFromCache(tokenName)
var ok: Boolean = true
// token expired
if (!isTokenValid(tmpToken)) {
ok = false
throw new TokenAuthException(
TOKEN_VALID_OR_STALE.getErrorCode,
TOKEN_VALID_OR_STALE.getErrorDesc
TOKEN_IS_EXPIRED.getErrorCode,
MessageFormat.format(TOKEN_IS_EXPIRED.getErrorDesc, tokenName)
)
}
if (!isTokenAcceptableWithUser(tmpToken, userName)) {
ok = false
throw new TokenAuthException(ILLEGAL_TOKENUSER.getErrorCode, ILLEGAL_TOKENUSER.getErrorDesc)
throw new TokenAuthException(
ILLEGAL_TOKENUSER.getErrorCode,
MessageFormat.format(ILLEGAL_TOKENUSER.getErrorDesc, userName)
)
}
if (!isTokenAcceptableWithHost(tmpToken, host)) {
ok = false
throw new TokenAuthException(ILLEGAL_HOST.getErrorCode, ILLEGAL_HOST.getErrorDesc)
throw new TokenAuthException(
ILLEGAL_HOST.getErrorCode,
MessageFormat.format(ILLEGAL_HOST.getErrorDesc, host)
)
}
ok
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ class DefaultGatewayParser(gatewayParsers: Array[GatewayParser]) extends Abstrac
gatewayContext.getGatewayRoute.setRequestURI(path)
}
gatewayParsers.foreach(_.parse(gatewayContext))

/**
* Gateway forwarding logic: PublicService Service exists and is effective And then judge
* metadataquery Service, Continue to judge linkismanager Service, Final judgment
* linkispsdatasource Service
*/
if (gatewayContext.getGatewayRoute.getServiceInstance == null) path match {
case CLIENT_HEARTBEAT_REGEX(version) =>
if (sendResponseWhenNotMatchVersion(gatewayContext, version)) return
Expand All @@ -122,10 +128,11 @@ class DefaultGatewayParser(gatewayParsers: Array[GatewayParser]) extends Abstrac
) {
RPCConfiguration.PUBLIC_SERVICE_APPLICATION_NAME.getValue
// In order to be compatible with metadata module name refactoring,this logic will be removed in subsequent versions
} else if (RPCConfiguration.METADATAQUERY_SERVICE_LIST.contains(serviceId)) {
RPCConfiguration.METADATAQUERY_SERVICE_APPLICATION_NAME.getValue
} else if (RPCConfiguration.LINKIS_DATASOURCE_SERVICE_LIST.contains(serviceId)) {
RPCConfiguration.LINKIS_DATASOURCE_SERVICE_NAME.getValue
} else if (RPCConfiguration.LINKIS_MANAGER_SERVICE_LIST.contains(serviceId)) {
RPCConfiguration.LINKIS_MANAGER_SERVICE_NAME.getValue
// After the complete merge is completed, it needs to be removed
} else {
serviceId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object TokenAuthentication extends Logging {
})
if (ok) {
logger.info(
s"Token authentication succeed, uri: ${gatewayContext.getRequest.getRequestURI}, token: $token, tokenUser: $tokenUser."
s"Token authentication succeed, uri: ${gatewayContext.getRequest.getRequestURI}, token: $token, tokenUser: $tokenUser, host: $host."
)
if (login) {
logger.info(
Expand All @@ -115,6 +115,9 @@ object TokenAuthentication extends Logging {
}
true
} else {
logger.info(
s"Token authentication fail, uri: ${gatewayContext.getRequest.getRequestURI}, token: $token, tokenUser: $tokenUser, host: $host."
)
SecurityFilter.filterResponse(gatewayContext, authMsg)
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ import org.apache.linkis.manager.label.entity.route.RouteLabel
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.utils.ZuulEntranceUtils
import org.apache.linkis.rpc.sender.SpringCloudFeignConfigurationCache
import org.apache.linkis.server.{toScalaBuffer, BDPJettyServerHelper}
import org.apache.linkis.server.BDPJettyServerHelper

import org.springframework.stereotype.Component

import java.util
import java.util.Locale

import scala.collection.JavaConverters._

@Component
class DSSGatewayParser extends AbstractGatewayParser {

Expand Down Expand Up @@ -160,7 +162,7 @@ class DSSGatewayParser extends AbstractGatewayParser {
logger.info(
"Get ServiceName From Label and method is " + gatewayContext.getRequest.getMethod.toString + ",and urlLabels is " + requestUrlLabels
)
val requestMethod = gatewayContext.getRequest.getMethod.toLowerCase(Locale.ROOT)
val requestMethod = gatewayContext.getRequest.getMethod.toLowerCase(Locale.getDefault())
if (
requestUrlLabels == null && (requestMethod
.equals("post") || requestMethod.equals("put") || requestMethod.equals("delete"))
Expand All @@ -179,15 +181,15 @@ class DSSGatewayParser extends AbstractGatewayParser {
case map: util.Map[String, Any] => labelBuilderFactory.getLabels(map.asInstanceOf)
case _ => new util.ArrayList[Label[_]]()
}
labels
labels.asScala
.filter(label => label.isInstanceOf[RouteLabel])
.foreach(label => {
routeLabelList.add(label.asInstanceOf[RouteLabel])
})

case _ => null
}
val labelNameList = routeLabelList.map(routeLabel => routeLabel.getStringValue).toList
val labelNameList = routeLabelList.asScala.map(routeLabel => routeLabel.getStringValue).toList
if (labelNameList != null && labelNameList.size > 0) {
genServiceNameByDSSLabel(labelNameList, tmpServiceName)
} else if (null != requestUrlLabels) {
Expand Down Expand Up @@ -239,7 +241,7 @@ class DSSGatewayParser extends AbstractGatewayParser {
): Option[String] = {
val findIt: (String => Boolean) => Option[String] = op => {
val services =
SpringCloudFeignConfigurationCache.getDiscoveryClient.getServices.filter(op).toList
SpringCloudFeignConfigurationCache.getDiscoveryClient.getServices.asScala.filter(op).toList
if (services.length == 1) Some(services.head)
else if (services.length > 1) tooManyDeal(services)
else None
Expand All @@ -250,7 +252,7 @@ class DSSGatewayParser extends AbstractGatewayParser {
val findMostCorrect: (String => (String, Int)) => Option[String] = { op =>
{
val serviceMap =
SpringCloudFeignConfigurationCache.getDiscoveryClient.getServices.map(op).toMap
SpringCloudFeignConfigurationCache.getDiscoveryClient.getServices.asScala.map(op).toMap
var count = 0
var retService: Option[String] = None
serviceMap.foreach { case (k, v) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.gateway.springcloud.handler;

import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.DefaultErrorAttributes;
import org.springframework.core.annotation.MergedAnnotation;
import org.springframework.core.annotation.MergedAnnotations;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ResponseStatusException;

import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Lists;

@Component
public class CustomErrorAttributes extends DefaultErrorAttributes {

@Override
public Map<String, Object> getErrorAttributes(
ServerRequest request, ErrorAttributeOptions options) {
Throwable throwable = this.getError(request);
MergedAnnotation<ResponseStatus> responseStatusAnnotation =
MergedAnnotations.from(
throwable.getClass(), MergedAnnotations.SearchStrategy.TYPE_HIERARCHY)
.get(ResponseStatus.class);
HttpStatus errorStatus = determineHttpStatus(throwable, responseStatusAnnotation);
Map<String, Object> map = new HashMap<>();
map.put("method", request.path());
map.put("status", errorStatus.value());
String msg = errorStatus.getReasonPhrase();
if (errorStatus.value() >= HttpStatus.INTERNAL_SERVER_ERROR.value()) {
msg = msg + ", with request path:" + request.path();
}
map.put("message", msg);
map.put("data", Lists.newArrayList());

return map;
}

private HttpStatus determineHttpStatus(
Throwable error, MergedAnnotation<ResponseStatus> responseStatusAnnotation) {
if (error instanceof ResponseStatusException) {
return ((ResponseStatusException) error).getStatus();
}
return responseStatusAnnotation
.getValue("code", HttpStatus.class)
.orElse(HttpStatus.INTERNAL_SERVER_ERROR);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private Mono<Void> gatewayDeal(
if (serviceInstance != null) {
logger.info(
"Client request ip: "
+ gatewayContext.getRequest().getRemoteAddress()
+ gatewayContext.getRequest().getRequestRealIpAddr()
+ " and uri: "
+ gatewayContext.getRequest().getRequestURI()
+ "GatewayRouter route requestUri: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class GatewayWebSocketSessionConnection(
}
}

def getAddress: InetSocketAddress = webSocketSession.getHandshakeInfo.getRemoteAddress;
// todo
def getAddress: InetSocketAddress = null

def getProxyWebSocketSession(
serviceInstance: ServiceInstance
Expand Down Expand Up @@ -191,7 +192,8 @@ class GatewayWebSocketSession private (

protected var webSocketConnection: WebSocketConnection = _

def isAlive: Boolean = !webSocketConnection.getInbound.receiveCloseStatus().subscribe().isDisposed
// todo
def isAlive: Boolean = true

override def receive(): Flux[WebSocketMessage] = webSocketConnection.getInbound
.aggregateFrames(ServerConfiguration.BDP_SERVER_SOCKET_TEXT_MESSAGE_SIZE_MAX.getValue.toInt)
Expand Down
Loading