From 43cb5b2e307d51a5194135b0295176bb3f3bdc1b Mon Sep 17 00:00:00 2001 From: peacewong Date: Tue, 14 May 2024 15:50:11 +0800 Subject: [PATCH] The interface of basic data management should be judged by the administrator. (#5108) * Add administrator judgment * Add jdbc params * Add file path judgment * add throw Exception * code format code format * remove no use file * code optimize * use feign requestInterceptor to fixed server * add gateway ip priority load balancer * Fix Rpc bug * Support automatic retry for important RPC requests * Fix build issue * code format --- .../linkis/common/utils/SecurityUtils.java | 36 +++++ .../protocol/AbstractRetryableProtocol.java | 6 +- .../linkis/protocol/engine/EngineInfo.java | 47 ------- .../protocol/IRServiceGroupProtocol.scala | 27 ---- .../linkis/protocol/RetryableProtocol.scala | 6 +- .../linkis/protocol/UserWithCreator.scala} | 4 +- .../callback/LogCallbackProtocol.scala | 24 ---- .../protocol/engine/EngineCallback.scala | 35 ----- .../engine/EngineStateTransitionRequest.scala | 27 ---- .../protocol/engine/RequestEngineStatus.scala | 32 ----- .../engine/RequestUserEngineKill.scala | 34 ----- .../linkis/protocol/utils/ProtocolUtils.scala | 44 ------ .../engine/RequestEngineStatusTest.scala | 44 ------ .../engine/ResponseUserEngineKillTest.scala | 35 ----- .../protocol/utils/ProtocolUtilsTest.scala | 45 ------- .../linkis/rpc/conf/CacheManualRefresher.java | 22 --- .../linkis/rpc/conf/DynamicFeignClient.java | 126 ------------------ .../EurekaClientCacheManualRefresher.java | 118 ---------------- .../rpc/conf/FeignRequestInterceptor.java | 64 --------- .../conf/NacosClientCacheManualRefresher.java | 40 ------ .../linkis/rpc/constant/RpcConstant.java | 4 - .../errorcode/LinkisRpcErrorCodeSummary.java | 2 + .../org/apache/linkis/rpc/BaseRPCSender.scala | 49 ++----- .../apache/linkis/rpc/RPCReceiveRemote.scala | 15 +-- .../linkis/rpc/RPCSpringBeanCache.scala | 8 -- .../common/RetryableRPCInterceptor.scala | 10 -- .../rpc/sender/SpringMVCRPCSender.scala | 52 ++++++-- .../common/protocol/job/JobReqProcotol.scala | 3 +- .../common/protocol/task/RequestTask.scala | 3 +- .../protocol/task/ResponseEngineConnPid.scala | 3 +- .../protocol/task/ResponseTaskExecute.scala | 7 +- .../service/TaskExecutionServiceImpl.scala | 3 +- .../resource/EngineResourceRequest.scala | 2 +- .../impl/EnginePluginAdminServiceImpl.java | 11 ++ .../manager/am/exception/AMErrorCode.java | 3 +- .../engineplugin/jdbc/ConnectionManager.java | 1 + .../query/service/mysql/SqlConnection.java | 5 +- .../ConfigurationTemplateRestfulApi.java | 15 ++- .../restful/DatasourceAccessRestfulApi.java | 10 +- .../restful/DatasourceEnvRestfulApi.java | 7 +- .../restful/DatasourceTypeKeyRestfulApi.java | 8 +- .../restful/DatasourceTypeRestfulApi.java | 9 +- .../restful/GatewayAuthTokenRestfulApi.java | 22 ++- .../StaticAuthenticationStrategy.scala | 20 ++- .../response/DWSAuthenticationResult.scala | 5 + .../http/IpPriorityLoadBalancer.java | 100 +++----------- ...LinkisLoadBalancerClientConfiguration.java | 10 +- 47 files changed, 229 insertions(+), 974 deletions(-) delete mode 100644 linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala rename linkis-commons/{linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/GatewayLoadBalancerConfiguration.java => linkis-protocol/src/main/scala/org/apache/linkis/protocol/UserWithCreator.scala} (89%) delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala delete mode 100644 linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala delete mode 100644 linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala delete mode 100644 linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java rename linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java => linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java (51%) rename {linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer => linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http}/LinkisLoadBalancerClientConfiguration.java (77%) diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java index 0278b3337e..af163a6494 100644 --- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java @@ -320,4 +320,40 @@ private static boolean isNotSecurity(String key, String value, String param) { return key.toLowerCase().contains(param.toLowerCase()) || value.toLowerCase().contains(param.toLowerCase()); } + + /** + * allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false + * + * @return + */ + public static Properties getMysqlSecurityParams() { + Properties properties = new Properties(); + properties.setProperty("allowLoadLocalInfile", "false"); + properties.setProperty("autoDeserialize", "false"); + properties.setProperty("allowLocalInfile", "false"); + properties.setProperty("allowUrlInLocalInfile", "false"); + return properties; + } + + /** + * Check if the path has a relative path + * + * @param path + * @return + */ + public static boolean containsRelativePath(String path) { + if (path.startsWith("./") + || path.contains("/./") + || path.startsWith("../") + || path.contains("/../")) { + return true; + } + if (path.startsWith(".\\") + || path.contains("\\.\\") + || path.startsWith("..\\") + || path.contains("\\..\\")) { + return true; + } + return false; + } } diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java index 3dfd166846..aa7ddece50 100644 --- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java +++ b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java @@ -21,7 +21,7 @@ public class AbstractRetryableProtocol implements RetryableProtocol { @Override public long maxPeriod() { - return 3000L; + return 30000L; } @Override @@ -31,11 +31,11 @@ public Class[] retryExceptions() { @Override public int retryNum() { - return 2; + return 5; } @Override public long period() { - return 1000L; + return 10000L; } } diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java deleted file mode 100644 index 0504ee2113..0000000000 --- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine; - -public class EngineInfo { - - private Long id; - private EngineState engineState; - - public EngineInfo() {} - - public EngineInfo(Long id, EngineState state) { - this.id = id; - this.engineState = state; - } - - public Long getId() { - return id; - } - - public void setId(Long id) { - this.id = id; - } - - public EngineState getEngineState() { - return engineState; - } - - public void setEngineState(EngineState engineState) { - this.engineState = engineState; - } -} diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala deleted file mode 100644 index 675dc0c830..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol - -trait IRServiceGroupProtocol extends IRProtocol with InstanceProtocol { - val userWithCreator: UserWithCreator - - def user: String = userWithCreator.user - def creator: String = userWithCreator.creator -} - -case class UserWithCreator(user: String, creator: String) diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala index 51509d6883..6ebee7d0e2 100644 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala +++ b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala @@ -18,8 +18,8 @@ package org.apache.linkis.protocol trait RetryableProtocol extends Protocol { - def retryNum: Int = 2 - def period: Long = 1000L - def maxPeriod: Long = 3000L + def retryNum: Int = 5 + def period: Long = 10000L + def maxPeriod: Long = 30000L def retryExceptions: Array[Class[_ <: Throwable]] = Array.empty[Class[_ <: Throwable]] } diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/GatewayLoadBalancerConfiguration.java b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/UserWithCreator.scala similarity index 89% rename from linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/GatewayLoadBalancerConfiguration.java rename to linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/UserWithCreator.scala index 0c9aecf177..cebaf3b9b2 100644 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/GatewayLoadBalancerConfiguration.java +++ b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/UserWithCreator.scala @@ -15,6 +15,6 @@ * limitations under the License. */ -package org.apache.linkis.rpc.loadbalancer; +package org.apache.linkis.protocol -public class GatewayLoadBalancerConfiguration {} +case class UserWithCreator(user: String, creator: String) diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala deleted file mode 100644 index 0109472a90..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.callback - -import org.apache.linkis.protocol.message.RequestProtocol - -case class YarnAPPIdCallbackProtocol(nodeId: String, applicationId: String) extends RequestProtocol - -case class YarnInfoCallbackProtocol(nodeId: String, uri: String) extends RequestProtocol diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala deleted file mode 100644 index 8856d3a927..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -object EngineCallback { - private val DWC_APPLICATION_NAME = "dwc.application.name" - private val DWC_INSTANCE = "dwc.application.instance" - - def mapToEngineCallback(options: Map[String, String]): EngineCallback = - EngineCallback(options(DWC_APPLICATION_NAME), options(DWC_INSTANCE)) - - def callbackToMap(engineCallback: EngineCallback): Map[String, String] = - Map( - DWC_APPLICATION_NAME -> engineCallback.applicationName, - DWC_INSTANCE -> engineCallback.instance - ) - -} - -case class EngineCallback(applicationName: String, instance: String) diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala deleted file mode 100644 index 9137001c14..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -case class EngineStateTransitionRequest(engineInstance: String, state: String) - -case class EngineStateTransitionResponse( - engineInstance: String, - state: String, - result: Boolean, - message: String -) diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala deleted file mode 100644 index a4672aa4e5..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -import org.apache.linkis.protocol.RetryableProtocol -import org.apache.linkis.protocol.message.RequestProtocol - -case class RequestEngineStatus(messageType: Int) extends RetryableProtocol with RequestProtocol - -object RequestEngineStatus { - val Status_Only = 1 - val Status_Overload = 2 - val Status_Concurrent = 3 - val Status_Overload_Concurrent = 4 - val Status_BasicInfo = 5 - val ALL = 6 -} diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala deleted file mode 100644 index beb7987b01..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -import org.apache.linkis.protocol.message.RequestProtocol - -case class RequestUserEngineKill( - ticketId: String, - creator: String, - user: String, - properties: Map[String, String] -) extends RequestProtocol - -case class ResponseUserEngineKill(ticketId: String, status: String, message: String) - -object ResponseUserEngineKill { - val Success = "Success" - val Error = "Error" -} diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala deleted file mode 100644 index 1bb0791be3..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.utils - -import org.apache.linkis.common.conf.CommonVars - -object ProtocolUtils { - - val SERVICE_SUFFIX = CommonVars("wds.linkis.service.suffix", "engineManager,entrance,engine") - val suffixs = SERVICE_SUFFIX.getValue.split(",") - - /** - * Pass in moduleName to return the corresponding appName 传入moduleName返回对应的appName - * @param moduleName - * module's name - * @return - * application's name - */ - def getAppName(moduleName: String): Option[String] = { - val moduleNameLower = moduleName.toLowerCase() - for (suffix <- suffixs) { - if (moduleNameLower.contains(suffix.toLowerCase())) { - return Some(moduleNameLower.replace(suffix.toLowerCase(), "")) - } - } - None - } - -} diff --git a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala b/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala deleted file mode 100644 index d9fc07b6c0..0000000000 --- a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -import org.junit.jupiter.api.{Assertions, DisplayName, Test} - -class RequestEngineStatusTest { - - @Test - @DisplayName("constTest") - def constTest(): Unit = { - - val statusOnly = RequestEngineStatus.Status_Only - val statusOverload = RequestEngineStatus.Status_Overload - val statusConcurrent = RequestEngineStatus.Status_Concurrent - val statusOverloadConcurrent = RequestEngineStatus.Status_Overload_Concurrent - val statusBasicInfo = RequestEngineStatus.Status_BasicInfo - val all = RequestEngineStatus.ALL - - Assertions.assertTrue(1 == statusOnly) - Assertions.assertTrue(2 == statusOverload) - Assertions.assertTrue(3 == statusConcurrent) - Assertions.assertTrue(4 == statusOverloadConcurrent) - Assertions.assertTrue(5 == statusBasicInfo) - Assertions.assertTrue(6 == all) - - } - -} diff --git a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala b/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala deleted file mode 100644 index dbf3f5e3b5..0000000000 --- a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -import org.junit.jupiter.api.{Assertions, DisplayName, Test} - -class ResponseUserEngineKillTest { - - @Test - @DisplayName("constTest") - def constTest(): Unit = { - - val success = ResponseUserEngineKill.Success - val error = ResponseUserEngineKill.Error - - Assertions.assertEquals("Success", success) - Assertions.assertEquals("Error", error) - } - -} diff --git a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala b/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala deleted file mode 100644 index 2435f51497..0000000000 --- a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.utils - -import org.junit.jupiter.api.{Assertions, DisplayName, Test} - -class ProtocolUtilsTest { - - @Test - @DisplayName("constTest") - def constTest(): Unit = { - - val serviceSuffix = ProtocolUtils.SERVICE_SUFFIX.getValue - val suffixs = ProtocolUtils.suffixs - - Assertions.assertNotNull(serviceSuffix) - Assertions.assertTrue(suffixs.length == 3) - } - - @Test - @DisplayName("getAppNameTest") - def getAppNameTest(): Unit = { - - val modeleName = "engineManager" - val appNameOption = ProtocolUtils.getAppName(modeleName) - Assertions.assertNotNull(appNameOption.get) - - } - -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java deleted file mode 100644 index dbdf52a1fc..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -public interface CacheManualRefresher { - void refresh(); -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java deleted file mode 100644 index dd1f6a7dc9..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -import org.apache.linkis.DataWorkCloudApplication; - -import org.apache.commons.lang3.StringUtils; - -import org.springframework.cloud.openfeign.FeignClientBuilder; -import org.springframework.cloud.openfeign.FeignClientFactoryBean; -import org.springframework.stereotype.Component; - -import java.util.concurrent.ConcurrentHashMap; - -@Component -public class DynamicFeignClient { - - private FeignClientBuilder feignClientBuilder; - - private final ConcurrentHashMap CACHE_BEAN = new ConcurrentHashMap(); - - public DynamicFeignClient() { - this.feignClientBuilder = - new FeignClientBuilder(DataWorkCloudApplication.getApplicationContext()); - } - - public T getFeignClient(final Class type, final String serviceName) { - return getFeignClient(type, serviceName, null); - } - - public T getFeignClient( - final Class type, final Class fallbackFactory, final String serviceName) { - return getFeignClient(type, fallbackFactory, serviceName, null); - } - - public T getFeignClient( - final Class type, - final FeignClientFactoryBean clientFactoryBean, - final String serviceName) { - return getFeignClient(type, clientFactoryBean, serviceName, null); - } - - public T getFeignClient(final Class type, String serviceName, final String serviceUrl) { - String k = serviceName; - if (StringUtils.isNotEmpty(serviceUrl)) { - k = serviceUrl; - } - return CACHE_BEAN.computeIfAbsent( - k, - (t) -> { - FeignClientBuilder.Builder builder = - this.feignClientBuilder.forType(type, serviceName); - if (StringUtils.isNotEmpty(serviceUrl)) { - builder.url(serviceUrl); - } - return builder.build(); - }); - } - - public T getFeignClient( - final Class type, - final Class fallbackFactory, - final String serviceName, - final String serviceUrl) { - String k = serviceName; - if (StringUtils.isNotEmpty(serviceUrl)) { - k = serviceUrl; - } - return CACHE_BEAN.computeIfAbsent( - k, - (t) -> { - FeignClientFactoryBean feignClientFactoryBean = new FeignClientFactoryBean(); - feignClientFactoryBean.setFallbackFactory(fallbackFactory); - FeignClientBuilder.Builder builder = - this.feignClientBuilder.forType(type, feignClientFactoryBean, serviceName); - if (StringUtils.isNotEmpty(serviceUrl)) { - builder.url(serviceUrl); - } - return builder.build(); - }); - } - - public T getFeignClient( - final Class type, - final FeignClientFactoryBean clientFactoryBean, - final String serviceName, - final String serviceUrl) { - String k = serviceName; - if (StringUtils.isNotEmpty(serviceUrl)) { - k = serviceUrl; - } - return CACHE_BEAN.computeIfAbsent( - k, - (t) -> { - FeignClientBuilder.Builder builder = - this.feignClientBuilder.forType(type, clientFactoryBean, serviceName); - if (StringUtils.isNotEmpty(serviceUrl)) { - builder.url(serviceUrl); - } - return builder.build(); - }); - } - - private T getFromCache(final String serviceName, final String serviceUrl) { - if (StringUtils.isNotEmpty(serviceUrl)) { - return CACHE_BEAN.get(serviceUrl); - } else { - return CACHE_BEAN.get(serviceName); - } - } -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java deleted file mode 100644 index 7394698672..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -import org.apache.commons.lang3.exception.ExceptionUtils; - -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.stereotype.Component; -import org.springframework.util.ReflectionUtils; - -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.time.Duration; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Component -@ConditionalOnClass(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration") -public class EurekaClientCacheManualRefresher implements CacheManualRefresher { - private static final Logger logger = - LoggerFactory.getLogger(EurekaClientCacheManualRefresher.class); - private final AtomicBoolean isRefreshing = new AtomicBoolean(false); - private final ExecutorService refreshExecutor = Executors.newSingleThreadExecutor(); - private final String cacheRefreshTaskField = "cacheRefreshTask"; - private Object cacheRefreshTask; - - private long lastRefreshMillis = 0; - private final Duration refreshIntervalDuration = Duration.ofSeconds(3); - - @Autowired private BeanFactory beanFactory; - - public void refreshOnExceptions(Exception e, List> clazzs) { - if (null == clazzs || clazzs.size() == 0) { - throw new IllegalArgumentException(); - } - - if (clazzs.stream() - .anyMatch( - clazz -> clazz.isInstance(e) || clazz.isInstance(ExceptionUtils.getRootCause(e)))) { - refresh(); - } - } - - public void refresh() { - if (isRefreshing.compareAndSet(false, true)) { - refreshExecutor.execute( - () -> { - try { - if (System.currentTimeMillis() - <= lastRefreshMillis + refreshIntervalDuration.toMillis()) { - logger.warn( - "Not manually refresh eureka client cache as refresh interval was not exceeded:{}", - refreshIntervalDuration.getSeconds()); - return; - } - - String discoveryClientClassName = "com.netflix.discovery.DiscoveryClient"; - if (null == cacheRefreshTask) { - Class discoveryClientClass = Class.forName(discoveryClientClassName); - Field field = - ReflectionUtils.findField(discoveryClientClass, cacheRefreshTaskField); - if (null != field) { - ReflectionUtils.makeAccessible(field); - Object discoveryClient = beanFactory.getBean(discoveryClientClass); - cacheRefreshTask = ReflectionUtils.getField(field, discoveryClient); - } - } - - if (null == cacheRefreshTask) { - logger.error( - "Field ({}) not found in class '{}'", - cacheRefreshTaskField, - discoveryClientClassName); - return; - } - - lastRefreshMillis = System.currentTimeMillis(); - Class timedSupervisorTaskClass = - Class.forName("com.netflix.discovery.TimedSupervisorTask"); - Method method = timedSupervisorTaskClass.getDeclaredMethod("run"); - method.setAccessible(true); - method.invoke(cacheRefreshTask); - logger.info( - "Manually refresh eureka client cache completed(DiscoveryClient.cacheRefreshTask#run())"); - } catch (Exception e) { - logger.error("An exception occurred when manually refresh eureka client cache", e); - } finally { - isRefreshing.set(false); - } - }); - } else { - logger.warn( - "Not manually refresh eureka client cache as another thread is refreshing it already"); - } - } -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java deleted file mode 100644 index ef1c2dd095..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -import org.apache.linkis.rpc.BaseRPCSender; -import org.apache.linkis.rpc.constant.RpcConstant; -import org.apache.linkis.server.BDPJettyServerHelper; -import org.apache.linkis.server.Message; -import org.apache.linkis.server.security.SSOUtils$; -import org.apache.linkis.server.security.SecurityFilter$; - -import org.springframework.stereotype.Component; - -import java.io.UnsupportedEncodingException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import scala.Tuple2; - -import feign.RequestInterceptor; -import feign.RequestTemplate; - -@Component -public class FeignRequestInterceptor implements RequestInterceptor { - - @Override - public void apply(RequestTemplate requestTemplate) { - Map> headers = new HashMap<>(requestTemplate.headers()); - headers.put( - RpcConstant.LINKIS_LOAD_BALANCER_TYPE, - Arrays.asList(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC)); - Tuple2 userTicketKV = - SSOUtils$.MODULE$.getUserTicketKV(SecurityFilter$.MODULE$.OTHER_SYSTEM_IGNORE_UM_USER()); - headers.put(userTicketKV._1, Arrays.asList(userTicketKV._2)); - try { - String body = - new String( - requestTemplate.body(), - org.apache.linkis.common.conf.Configuration.BDP_ENCODING().getValue()); - Message message = BDPJettyServerHelper.gson().fromJson(body, Message.class); - headers.put( - RpcConstant.FIXED_INSTANCE, Arrays.asList(BaseRPCSender.getFixedInstanceInfo(message))); - requestTemplate.headers(headers); - } catch (UnsupportedEncodingException e) { - } - } -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java deleted file mode 100644 index db26cd0f2c..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.stereotype.Component; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Component -@ConditionalOnClass(name = "com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration") -public class NacosClientCacheManualRefresher implements CacheManualRefresher { - private static final Logger logger = - LoggerFactory.getLogger(NacosClientCacheManualRefresher.class); - - public void refresh() { - try { - logger.warn("Failed to obtain nacos metadata. Wait 100 milliseconds"); - Thread.sleep(100L); - } catch (InterruptedException e) { - - } - } -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java index 3d46661de2..9fd0b81104 100644 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java +++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java @@ -19,9 +19,5 @@ public class RpcConstant { - public static final String LINKIS_LOAD_BALANCER_TYPE = "LinkisLoadBalancerType"; - - public static final String LINKIS_LOAD_BALANCER_TYPE_RPC = "RPC"; - public static final String FIXED_INSTANCE = "client-ip"; } diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java index b87e730994..a8daece891 100644 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java +++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java @@ -32,6 +32,8 @@ public enum LinkisRpcErrorCodeSummary implements LinkisErrorCode { 10051, "The instance:{0} of application {1} does not exist(应用程序:{0} 的实例:{1} 不存在)."), INSTANCE_ERROR(10052, "The instance:{0} is error should ip:port."), + + INSTANCE_NOT_FOUND_ERROR(10053, "The instance:{0} is not found."), RPC_INIT_ERROR(10054, "Asyn RPC Consumer Thread has stopped!(Asyn RPC Consumer 线程已停止!)"); /** 错误码 */ diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala index 1c4e43b3cc..149179f8b1 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala @@ -22,28 +22,27 @@ import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.exception.WarnException import org.apache.linkis.common.utils.Logging import org.apache.linkis.protocol.Protocol -import org.apache.linkis.rpc.conf.DynamicFeignClient import org.apache.linkis.rpc.conf.RPCConfiguration.{ BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX, BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_MAX, BDP_RPC_SENDER_ASYN_QUEUE_CAPACITY } -import org.apache.linkis.rpc.constant.RpcConstant import org.apache.linkis.rpc.interceptor._ import org.apache.linkis.rpc.transform.{RPCConsumer, RPCProduct} import org.apache.linkis.server.Message - -import org.apache.commons.lang3.StringUtils +import org.apache.linkis.server.conf.ServerConfiguration import java.util import scala.concurrent.duration.Duration import scala.runtime.BoxedUnit +import feign.{Feign, Retryer} +import feign.slf4j.Slf4jLogger + private[rpc] class BaseRPCSender extends Sender with Logging { private var name: String = _ private var rpc: RPCReceiveRemote = _ - private var dynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = _ protected def getRPCInterceptors: Array[RPCInterceptor] = Array.empty @@ -68,21 +67,18 @@ private[rpc] class BaseRPCSender extends Sender with Logging { rpc } - private def getDynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = { - if (dynamicFeignClient == null) this synchronized { - if (dynamicFeignClient == null) dynamicFeignClient = new DynamicFeignClient() - } - dynamicFeignClient - } - private[rpc] def getApplicationName = name - def getSenderInstance(): String = { - null - } + protected def doBuilder(builder: Feign.Builder): Unit = + builder.retryer(Retryer.NEVER_RETRY) protected def newRPC: RPCReceiveRemote = { - getDynamicFeignClient.getFeignClient(classOf[RPCReceiveRemote], name) + val builder = Feign.builder.logger(new Slf4jLogger()).logLevel(feign.Logger.Level.FULL) + doBuilder(builder) + var url = if (name.startsWith("http://")) name else "http://" + name + if (url.endsWith("/")) url = url.substring(0, url.length - 1) + url += ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue + builder.target(classOf[RPCReceiveRemote], url) } private def execute(message: Any)(op: => Any): Any = message match { @@ -94,9 +90,6 @@ private[rpc] class BaseRPCSender extends Sender with Logging { override def ask(message: Any): Any = execute(message) { val msg = RPCProduct.getRPCProduct.toMessage(message) - if (StringUtils.isNotBlank(getSenderInstance())) { - BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance()) - } BaseRPCSender.addInstanceInfo(msg.getData) val response = getRPC.receiveAndReply(msg) RPCConsumer.getRPCConsumer.toObject(response) @@ -105,9 +98,6 @@ private[rpc] class BaseRPCSender extends Sender with Logging { override def ask(message: Any, timeout: Duration): Any = execute(message) { val msg = RPCProduct.getRPCProduct.toMessage(message) msg.data("duration", timeout.toMillis) - if (StringUtils.isNotBlank(getSenderInstance())) { - BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance()) - } BaseRPCSender.addInstanceInfo(msg.getData) val response = getRPC.receiveAndReplyInMills(msg) RPCConsumer.getRPCConsumer.toObject(response) @@ -115,9 +105,6 @@ private[rpc] class BaseRPCSender extends Sender with Logging { private def sendIt(message: Any, op: Message => Message): Unit = execute(message) { val msg = RPCProduct.getRPCProduct.toMessage(message) - if (StringUtils.isNotBlank(getSenderInstance())) { - BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance()) - } BaseRPCSender.addInstanceInfo(msg.getData) RPCConsumer.getRPCConsumer.toObject(op(msg)) match { case w: WarnException => logger.warn("RPC requests an alarm!(RPC请求出现告警!)", w) @@ -188,16 +175,4 @@ private[rpc] object BaseRPCSender extends Logging { ServiceInstance(name, instance) } - def addFixedInstanceInfo(map: util.Map[String, Object], fixedInstance: String): Unit = { - map.put(RpcConstant.FIXED_INSTANCE, fixedInstance) - } - - def getFixedInstanceInfo(message: Message): String = { - if (null != message && null != message.getData) { - message.getData.getOrDefault(RpcConstant.FIXED_INSTANCE, null).asInstanceOf[String] - } else { - null - } - } - } diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala index 458ada9308..c539652d31 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala @@ -23,22 +23,13 @@ import org.springframework.web.bind.annotation.{RequestBody, RequestMapping, Req private[rpc] trait RPCReceiveRemote { - @RequestMapping( - value = Array("${spring.mvc.servlet.path}/rpc/receive"), - method = Array(RequestMethod.POST) - ) + @RequestMapping(value = Array("/rpc/receive"), method = Array(RequestMethod.POST)) def receive(@RequestBody message: Message): Message - @RequestMapping( - value = Array("${spring.mvc.servlet.path}/rpc/receiveAndReply"), - method = Array(RequestMethod.POST) - ) + @RequestMapping(value = Array("/rpc/receiveAndReply"), method = Array(RequestMethod.POST)) def receiveAndReply(@RequestBody message: Message): Message - @RequestMapping( - value = Array("${spring.mvc.servlet.path}/rpc/replyInMills"), - method = Array(RequestMethod.POST) - ) + @RequestMapping(value = Array("/rpc/replyInMills"), method = Array(RequestMethod.POST)) def receiveAndReplyInMills(@RequestBody message: Message): Message } diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala index ff542aaad5..00fa019d99 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala @@ -33,7 +33,6 @@ private[rpc] object RPCSpringBeanCache extends Logging { private var rpcServerLoader: RPCServerLoader = _ private var senderBuilders: Array[BroadcastSenderBuilder] = _ private var rpcReceiveRestful: RPCReceiveRestful = _ - private var rpcReceiveRemote: RPCReceiveRemote = _ def registerReceiver(receiverName: String, receiver: Receiver): Unit = { if (beanNameToReceivers == null) { @@ -64,13 +63,6 @@ private[rpc] object RPCSpringBeanCache extends Logging { rpcReceiveRestful } - def getRPCReceiveRemote: RPCReceiveRemote = { - if (rpcReceiveRemote == null) { - rpcReceiveRemote = getApplicationContext.getBean(classOf[RPCReceiveRemote]) - } - rpcReceiveRemote - } - private[rpc] def getReceivers: util.Map[String, Receiver] = { if (beanNameToReceivers == null) { beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver]) diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala index 4faeaa180e..d835eef328 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala @@ -42,25 +42,15 @@ import feign.RetryableException class RetryableRPCInterceptor extends RPCInterceptor { override val order: Int = 20 -// private val commonRetryHandler = new RPCRetryHandler -// commonRetryHandler.setRetryInfo(new RetryableProtocol{}) -// -// private def isCommonRetryHandler(retry: RetryableProtocol): Boolean = retry.maxPeriod == commonRetryHandler.getRetryMaxPeriod && -// retry.period == commonRetryHandler.getRetryPeriod && retry.retryNum == commonRetryHandler.getRetryNum && -// (retry.retryExceptions.isEmpty || commonRetryHandler.getRetryExceptions.containsSlice(retry.retryExceptions)) - override def intercept( interceptorExchange: RPCInterceptorExchange, chain: RPCInterceptorChain ): Any = interceptorExchange.getProtocol match { case retry: RetryableProtocol => val retryName = retry.getClass.getSimpleName -// if(isCommonRetryHandler(retry)) commonRetryHandler.retry(chain.handle(interceptorExchange), retryName) -// else { val retryHandler = new RPCRetryHandler retryHandler.setRetryInfo(retry, chain) retryHandler.retry(chain.handle(interceptorExchange), retryName) -// } case _ => chain.handle(interceptorExchange) } diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala index 1aae1f0cf3..ae1070865a 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala @@ -18,17 +18,21 @@ package org.apache.linkis.rpc.sender import org.apache.linkis.common.ServiceInstance +import org.apache.linkis.common.utils.Logging import org.apache.linkis.rpc.{BaseRPCSender, RPCMessageEvent, RPCSpringBeanCache} import org.apache.linkis.rpc.interceptor.{RPCInterceptor, ServiceInstanceRPCInterceptorChain} +import org.apache.linkis.server.conf.ServerConfiguration import org.apache.commons.lang3.StringUtils -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.core.env.Environment +import feign._ private[rpc] class SpringMVCRPCSender private[rpc] ( private[rpc] val serviceInstance: ServiceInstance -) extends BaseRPCSender(serviceInstance.getApplicationName) { +) extends BaseRPCSender(serviceInstance.getApplicationName) + with Logging { + + import SpringCloudFeignConfigurationCache._ override protected def getRPCInterceptors: Array[RPCInterceptor] = RPCSpringBeanCache.getRPCInterceptors @@ -36,8 +40,38 @@ private[rpc] class SpringMVCRPCSender private[rpc] ( override protected def createRPCInterceptorChain() = new ServiceInstanceRPCInterceptorChain(0, getRPCInterceptors, serviceInstance) - @Autowired - private var env: Environment = _ + /** + * If it's a random call, you don't need to set target specify instance,need to specify target and + * do not set client setting + * @param builder + */ + override protected def doBuilder(builder: Feign.Builder): Unit = { + if (serviceInstance != null && StringUtils.isNotBlank(serviceInstance.getInstance)) { + builder.requestInterceptor(new RequestInterceptor() { + def apply(template: RequestTemplate): Unit = { + template.target( + s"http://${serviceInstance.getInstance}${ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue}" + ) + } + }) + } + super.doBuilder(builder) + if (StringUtils.isBlank(serviceInstance.getInstance)) { + builder + .contract(getContract) + .encoder(getEncoder) + .decoder(getDecoder) + .client(getClient) + .requestInterceptor(getRPCTicketIdRequestInterceptor) + } else { + builder + .contract(getContract) + .encoder(getEncoder) + .decoder(getDecoder) + .requestInterceptor(getRPCTicketIdRequestInterceptor) + } + + } /** * Deliver is an asynchronous method that requests the target microservice asynchronously, @@ -66,12 +100,4 @@ private[rpc] class SpringMVCRPCSender private[rpc] ( s"RPCSender(${serviceInstance.getApplicationName})" } else s"RPCSender($getApplicationName, ${serviceInstance.getInstance})" - override def getSenderInstance(): String = { - if (null != serviceInstance) { - serviceInstance.getInstance - } else { - null - } - } - } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala index df197ddb2c..829a967aab 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala @@ -18,6 +18,7 @@ package org.apache.linkis.governance.common.protocol.job import org.apache.linkis.governance.common.entity.job.JobRequest +import org.apache.linkis.protocol.RetryableProtocol import org.apache.linkis.protocol.message.RequestProtocol import java.util @@ -25,7 +26,7 @@ import java.util.Date import scala.beans.BeanProperty -trait JobReq extends RequestProtocol +trait JobReq extends RequestProtocol with RetryableProtocol case class JobReqInsert(jobReq: JobRequest) extends JobReq diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala index 4d0b8952ca..17c01fcfc2 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala @@ -18,6 +18,7 @@ package org.apache.linkis.governance.common.protocol.task import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.protocol.RetryableProtocol import org.apache.linkis.protocol.message.RequestProtocol import java.util @@ -91,7 +92,7 @@ trait TaskState extends RequestProtocol {} case class RequestTaskPause(execId: String) extends TaskState case class RequestTaskResume(execId: String) extends TaskState -case class RequestTaskKill(execId: String) extends TaskState +case class RequestTaskKill(execId: String) extends TaskState with RetryableProtocol /** * The status of requesting job execution, mainly used for:
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala index 971bdf247b..ef1355d580 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala @@ -25,5 +25,4 @@ import org.apache.linkis.protocol.message.RequestProtocol * @param pid */ case class ResponseEngineConnPid(serviceInstance: ServiceInstance, pid: String, ticketId: String) - extends RetryableProtocol - with RequestProtocol + extends RequestProtocol diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala index b136c61099..a4a7837da0 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala @@ -30,8 +30,7 @@ case class ResponseTaskProgress( execId: String, progress: Float, progressInfo: Array[JobProgressInfo] -) extends RetryableProtocol - with RequestProtocol +) extends RequestProtocol case class ResponseEngineLock(lock: String) @@ -67,9 +66,7 @@ case class ResponseEngineStatus( engineInfo: ResponseEngineInfo ) -case class ResponseTaskLog(execId: String, log: String) - extends RetryableProtocol - with RequestProtocol +case class ResponseTaskLog(execId: String, log: String) extends RequestProtocol case class ResponseTaskError(execId: String, errorMsg: String) extends RetryableProtocol diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala index 6b4fc64fe6..9dba95ef66 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala @@ -154,8 +154,7 @@ class TaskExecutionServiceImpl sender = Sender.getSender(task.getCallbackServiceInstance()) sender.send(msg) } else { - // todo - logger.debug("SendtoEntrance error, cannot find entrance instance.") + logger.warn("SendtoEntrance error, cannot find entrance instance.") } } { t => val errorMsg = s"SendToEntrance error. $msg" + t.getCause diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala index 3b3005fee6..8bcc79b410 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala @@ -22,7 +22,7 @@ import org.apache.linkis.protocol.message.RequestProtocol import java.util -trait EngineResourceRequest extends RequestProtocol { +trait EngineResourceRequest { val user: String val labels: util.List[Label[_]] val properties: util.Map[String, String] diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/impl/EnginePluginAdminServiceImpl.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/impl/EnginePluginAdminServiceImpl.java index 069afd4f1b..603641fa78 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/impl/EnginePluginAdminServiceImpl.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/impl/EnginePluginAdminServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.linkis.bml.client.BmlClientFactory; import org.apache.linkis.bml.protocol.BmlResourceVersionsResponse; import org.apache.linkis.bml.protocol.Version; +import org.apache.linkis.common.utils.SecurityUtils; import org.apache.linkis.common.utils.ZipUtils; import org.apache.linkis.engineplugin.server.dao.EngineConnBmlResourceDao; import org.apache.linkis.engineplugin.server.entity.EngineConnBmlResource; @@ -28,6 +29,8 @@ import org.apache.linkis.engineplugin.server.restful.EnginePluginRestful; import org.apache.linkis.engineplugin.server.service.EnginePluginAdminService; import org.apache.linkis.engineplugin.vo.EnginePluginBMLVo; +import org.apache.linkis.manager.am.exception.AMErrorCode; +import org.apache.linkis.manager.am.exception.AMErrorException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -37,6 +40,7 @@ import java.io.FileOutputStream; import java.io.InputStream; import java.io.OutputStream; +import java.text.MessageFormat; import java.util.List; import com.github.pagehelper.PageHelper; @@ -79,6 +83,11 @@ public List getTypeList() { @Override public void deleteEnginePluginBML(String ecType, String version, String username) { List allEngineConnBmlResource = null; + if (ecType != null && SecurityUtils.containsRelativePath(ecType)) { + throw new AMErrorException( + AMErrorCode.EC_PLUGIN_ERROR.getErrorCode(), + MessageFormat.format(AMErrorCode.EC_PLUGIN_ERROR.getErrorDesc(), ecType)); + } try { allEngineConnBmlResource = engineConnBmlResourceDao.getAllEngineConnBmlResource(ecType, version); @@ -88,7 +97,9 @@ public void deleteEnginePluginBML(String ecType, String version, String username engineConnBmlResourceDao.delete(engineConnBmlResource); }); String engineConnsHome = defaultEngineConnBmlResourceGenerator.getEngineConnsHome(); + File file = new File(engineConnsHome + "/" + ecType); + if (file.exists()) { deleteDir(file); log.info("file {} delete success", ecType); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java index c05768739c..cc8997d857 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java @@ -32,7 +32,8 @@ public enum AMErrorCode implements LinkisErrorCode { ASK_ENGINE_ERROR_RETRY(210005, "Ask engine error, retry(请求引擎失败,重试)"), - EC_OPERATE_ERROR(210006, "Failed to execute operation(引擎操作失败)"); + EC_OPERATE_ERROR(210006, "Failed to execute operation(引擎操作失败)"), + EC_PLUGIN_ERROR(210007, "ECType {0} contains RelativePath"); private final int errorCode; diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java index a49613f8d1..38299081d8 100644 --- a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java +++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java @@ -192,6 +192,7 @@ protected DataSource buildDataSource(String dbUrl, Map propertie datasource.setUrl(dbUrl); datasource.setUsername(username); datasource.setPassword(password); + datasource.setConnectProperties(SecurityUtils.getMysqlSecurityParams()); datasource.setDriverClassName(driverClassName); datasource.setInitialSize(initialSize); datasource.setMinIdle(minIdle); diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java index d7559aa5cf..0844ce2cdf 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java @@ -158,7 +158,10 @@ public Connection getDBConnection(ConnectMessage connectMessage, String database url += "?" + extraParamString; } LOG.info("jdbc connection url: {}", url); - return DriverManager.getConnection(url, connectMessage.username, connectMessage.password); + Properties properties = SecurityUtils.getMysqlSecurityParams(); + properties.setProperty("user", connectMessage.username); + properties.setProperty("password", connectMessage.password); + return DriverManager.getConnection(url, properties); } public String getSqlConnectUrl() { diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/ConfigurationTemplateRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/ConfigurationTemplateRestfulApi.java index d6db4f7571..657d4531f3 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/ConfigurationTemplateRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/ConfigurationTemplateRestfulApi.java @@ -21,6 +21,7 @@ import org.apache.linkis.basedatamanager.server.request.ConfigurationTemplateSaveRequest; import org.apache.linkis.basedatamanager.server.response.EngineLabelResponse; import org.apache.linkis.basedatamanager.server.service.ConfigurationTemplateService; +import org.apache.linkis.common.conf.Configuration; import org.apache.linkis.server.Message; import org.apache.linkis.server.utils.ModuleUserUtils; @@ -49,7 +50,11 @@ public class ConfigurationTemplateRestfulApi { @RequestMapping(path = "/save", method = RequestMethod.POST) public Message add( HttpServletRequest httpRequest, @RequestBody ConfigurationTemplateSaveRequest request) { - ModuleUserUtils.getOperationUser(httpRequest, "save a configuration template"); + String username = + ModuleUserUtils.getOperationUser(httpRequest, "save a configuration template"); + if (!Configuration.isAdmin(username)) { + return Message.error("User '" + username + "' is not admin user[非管理员用户]"); + } if (Objects.isNull(request) || StringUtils.isEmpty(request.getEngineLabelId()) || StringUtils.isEmpty(request.getKey()) @@ -67,8 +72,12 @@ public Message add( @ApiOperation(value = "delete", notes = "delete a configuration template", httpMethod = "DELETE") @RequestMapping(path = "/{keyId}", method = RequestMethod.DELETE) public Message delete(HttpServletRequest httpRequest, @PathVariable("keyId") Long keyId) { - ModuleUserUtils.getOperationUser( - httpRequest, "delete a configuration template, keyId: " + keyId); + String username = + ModuleUserUtils.getOperationUser( + httpRequest, "delete a configuration template, keyId: " + keyId); + if (!Configuration.isAdmin(username)) { + return Message.error("User '" + username + "' is not admin user[非管理员用户]"); + } Boolean flag = configurationTemplateService.deleteConfigurationTemplate(keyId); return Message.ok("").data("success: ", flag); } diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceAccessRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceAccessRestfulApi.java index 7d39d493bd..4b0df0051d 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceAccessRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceAccessRestfulApi.java @@ -107,8 +107,14 @@ public Message add( httpMethod = "DELETE") @RequestMapping(path = "/{id}", method = RequestMethod.DELETE) public Message remove(HttpServletRequest request, @PathVariable("id") Long id) { - ModuleUserUtils.getOperationUser( - request, "Remove a Datasource Access Record,id:" + id.toString()); + String username = + ModuleUserUtils.getOperationUser( + request, "Remove a Datasource Access Record,id:" + id.toString()); + + if (!Configuration.isAdmin(username)) { + return Message.error("User '" + username + "' is not admin user[非管理员用户]"); + } + boolean result = datasourceAccessService.removeById(id); return Message.ok("").data("result", result); } diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceEnvRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceEnvRestfulApi.java index cf2953b0e4..e6029c761c 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceEnvRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceEnvRestfulApi.java @@ -97,7 +97,12 @@ public Message add(HttpServletRequest request, @RequestBody DatasourceEnvEntity httpMethod = "DELETE") @RequestMapping(path = "/{id}", method = RequestMethod.DELETE) public Message remove(HttpServletRequest request, @PathVariable("id") Long id) { - ModuleUserUtils.getOperationUser(request, "Remove a Datasource Env Record,id:" + id.toString()); + String username = + ModuleUserUtils.getOperationUser( + request, "Remove a Datasource Env Record,id:" + id.toString()); + if (!Configuration.isAdmin(username)) { + return Message.error("User '" + username + "' is not admin user[非管理员用户]"); + } boolean result = datasourceEnvService.removeById(id); return Message.ok("").data("result", result); } diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeKeyRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeKeyRestfulApi.java index 557c753227..b8ef65df5b 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeKeyRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeKeyRestfulApi.java @@ -99,8 +99,12 @@ public Message add( httpMethod = "DELETE") @RequestMapping(path = "/{id}", method = RequestMethod.DELETE) public Message remove(HttpServletRequest request, @PathVariable("id") Long id) { - ModuleUserUtils.getOperationUser( - request, "Remove a Datasource Type Key Record,id:" + id.toString()); + String username = + ModuleUserUtils.getOperationUser( + request, "Remove a Datasource Type Key Record,id:" + id.toString()); + if (!Configuration.isAdmin(username)) { + return Message.error("User '" + username + "' is not admin user[非管理员用户]"); + } boolean result = datasourceTypeKeyService.removeById(id); return Message.ok("").data("result", result); } diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeRestfulApi.java index 9fc8ea9d73..3c47d9385f 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeRestfulApi.java @@ -95,8 +95,13 @@ public Message add(HttpServletRequest request, @RequestBody DatasourceTypeEntity httpMethod = "DELETE") @RequestMapping(path = "/{id}", method = RequestMethod.DELETE) public Message remove(HttpServletRequest request, @PathVariable("id") Long id) { - ModuleUserUtils.getOperationUser( - request, "Remove a Datasource Type Record,id:" + id.toString()); + String username = + ModuleUserUtils.getOperationUser( + request, "Remove a Datasource Type Record,id:" + id.toString()); + if (!Configuration.isAdmin(username)) { + return Message.error("User '" + username + "' is not admin user[非管理员用户]"); + } + boolean result = datasourceTypeService.removeById(id); return Message.ok("").data("result", result); } diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/GatewayAuthTokenRestfulApi.java b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/GatewayAuthTokenRestfulApi.java index 7d5668c074..d55a8e258a 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/GatewayAuthTokenRestfulApi.java +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/GatewayAuthTokenRestfulApi.java @@ -55,9 +55,17 @@ public class GatewayAuthTokenRestfulApi { @RequestMapping(path = "", method = RequestMethod.GET) public Message list( HttpServletRequest request, String searchName, Integer currentPage, Integer pageSize) { - ModuleUserUtils.getOperationUser( - request, "Query list data of Gateway Auth Token,search name:" + searchName); + + String username = + ModuleUserUtils.getOperationUser( + request, "Query list data of Gateway Auth Token,search name:" + searchName); + + if (!Configuration.isAdmin(username)) { + return Message.error("User '" + username + "' is not admin user[非管理员用户]"); + } + PageInfo pageList = gatewayAuthTokenService.getListByPage(searchName, currentPage, pageSize); + return Message.ok("").data("list", pageList); } @@ -65,8 +73,14 @@ public Message list( @ApiOperation(value = "get", notes = "Get a Gateway Auth Token Record by id", httpMethod = "GET") @RequestMapping(path = "/{id}", method = RequestMethod.GET) public Message get(HttpServletRequest request, @PathVariable("id") Long id) { - ModuleUserUtils.getOperationUser( - request, "Get a Gateway Auth Token Record,id:" + id.toString()); + + String username = + ModuleUserUtils.getOperationUser( + request, "Get a Gateway Auth Token Record,id:" + id.toString()); + + if (!Configuration.isAdmin(username)) { + return Message.error("User '" + username + "' is not admin user[非管理员用户]"); + } GatewayAuthTokenEntity gatewayAuthToken = gatewayAuthTokenService.getById(id); return Message.ok("").data("item", gatewayAuthToken); } diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/authentication/StaticAuthenticationStrategy.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/authentication/StaticAuthenticationStrategy.scala index 2bd50a1ac8..a684152ad9 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/authentication/StaticAuthenticationStrategy.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/authentication/StaticAuthenticationStrategy.scala @@ -101,12 +101,28 @@ class StaticAuthenticationStrategy(override protected val sessionMaxAliveTime: L override def isTimeout(authentication: Authentication): Boolean = System.currentTimeMillis() - authentication.getLastAccessTime >= serverSessionTimeout + /** + * Forced login needs to consider the situation of multiple calls at the same time. If there are + * simultaneous calls, it should not be updated. request time < last creatTime and last createTime + * - currentTime < 1s + * @param requestAction + * @param serverUrl + * @return + */ override def enforceLogin(requestAction: Action, serverUrl: String): Authentication = { val key = getKey(requestAction, serverUrl) if (key == null) return null + val requestTime = System.currentTimeMillis() key.intern() synchronized { - val authentication = tryLogin(requestAction, serverUrl) - putSession(key, authentication) + var authentication = getAuthenticationActionByKey(key) + if ( + authentication == null || (authentication.getCreateTime < requestTime && (System + .currentTimeMillis() - authentication.getCreateTime) > 1000) + ) { + authentication = tryLogin(requestAction, serverUrl) + putSession(key, authentication) + logger.info(s"$key try enforceLogin") + } authentication } } diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/response/DWSAuthenticationResult.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/response/DWSAuthenticationResult.scala index 6ea3fc1673..6d714a3cdd 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/response/DWSAuthenticationResult.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/response/DWSAuthenticationResult.scala @@ -52,6 +52,8 @@ class DWSAuthenticationResult(response: HttpResponse, serverUrl: String) override def getAuthentication: Authentication = if (getStatus == 0) new HttpAuthentication { private var lastAccessTime: Long = System.currentTimeMillis + private val createTime: Long = System.currentTimeMillis() + override def authToCookies: Array[Cookie] = Array.empty override def authToHeaders: util.Map[String, String] = new util.HashMap[String, String]() @@ -61,6 +63,9 @@ class DWSAuthenticationResult(response: HttpResponse, serverUrl: String) override def getLastAccessTime: Long = lastAccessTime override def updateLastAccessTime(): Unit = lastAccessTime = System.currentTimeMillis + + override def getCreateTime: Long = createTime + } else { throw new HttpMessageParseException( diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java similarity index 51% rename from linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java rename to linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java index 2f81ba84d5..206b31ccf5 100644 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java @@ -15,21 +15,14 @@ * limitations under the License. */ -package org.apache.linkis.rpc.loadbalancer; +package org.apache.linkis.gateway.springcloud.http; -import org.apache.linkis.rpc.conf.CacheManualRefresher; import org.apache.linkis.rpc.constant.RpcConstant; -import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary; -import org.apache.linkis.rpc.exception.NoInstanceExistsException; -import org.apache.linkis.rpc.sender.SpringCloudFeignConfigurationCache$; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.*; import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier; @@ -37,38 +30,26 @@ import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; -import java.text.MessageFormat; import java.util.List; import java.util.Objects; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadLocalRandom; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; -public class ServiceInstancePriorityLoadBalancer implements ReactorServiceInstanceLoadBalancer { +public class IpPriorityLoadBalancer implements ReactorServiceInstanceLoadBalancer { - private static final Log log = LogFactory.getLog(ServiceInstancePriorityLoadBalancer.class); - - @Autowired private CacheManualRefresher cacheManualRefresher; + private static final Logger logger = LoggerFactory.getLogger(IpPriorityLoadBalancer.class); private final String serviceId; - - final AtomicInteger position; private final ObjectProvider serviceInstanceListSupplierProvider; - public ServiceInstancePriorityLoadBalancer( - ObjectProvider serviceInstanceListSupplierProvider, - String serviceId) { - this(serviceInstanceListSupplierProvider, serviceId, (new Random()).nextInt(1000)); - } - - public ServiceInstancePriorityLoadBalancer( - ObjectProvider serviceInstanceListSupplierProvider, + public IpPriorityLoadBalancer( String serviceId, - int seedPosition) { + ObjectProvider serviceInstanceListSupplierProvider) { this.serviceId = serviceId; this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider; - this.position = new AtomicInteger(seedPosition); } @Override @@ -84,47 +65,15 @@ public Mono> choose(Request request) { return supplier .get(request) .next() - .map( - serviceInstances -> - processInstanceResponse(request, supplier, serviceInstances, clientIp)); + .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, clientIp)); } private Response processInstanceResponse( - Request request, ServiceInstanceListSupplier supplier, List serviceInstances, String clientIp) { Response serviceInstanceResponse = getInstanceResponse(serviceInstances, clientIp); - Long endTtime = System.currentTimeMillis() + 2 * 60 * 1000; - - List linkisLoadBalancerTypeList = - ((RequestDataContext) request.getContext()) - .getClientRequest() - .getHeaders() - .get(RpcConstant.LINKIS_LOAD_BALANCER_TYPE); - String linkisLoadBalancerType = - CollectionUtils.isNotEmpty(linkisLoadBalancerTypeList) - ? linkisLoadBalancerTypeList.get(0) - : null; - - while (null == serviceInstanceResponse - && StringUtils.isNoneBlank(clientIp) - && isRPC(linkisLoadBalancerType) - && System.currentTimeMillis() < endTtime) { - cacheManualRefresher.refresh(); - List instances = - SpringCloudFeignConfigurationCache$.MODULE$.discoveryClient().getInstances(serviceId); - serviceInstanceResponse = getInstanceResponse(instances, clientIp); - if (null == serviceInstanceResponse) { - try { - Thread.sleep(5000L); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) { ((SelectedInstanceCallback) supplier) .selectedServiceInstance(serviceInstanceResponse.getServer()); @@ -132,40 +81,29 @@ && isRPC(linkisLoadBalancerType) return serviceInstanceResponse; } - private boolean isRPC(String linkisLoadBalancerType) { - return StringUtils.isNotBlank(linkisLoadBalancerType) - && linkisLoadBalancerType.equalsIgnoreCase(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC); - } - private Response getInstanceResponse( List instances, String clientIp) { if (instances.isEmpty()) { - log.warn("No servers available for service: " + serviceId); - return null; + logger.warn("No servers available for service: " + serviceId); + return new EmptyResponse(); } - int pos = this.position.incrementAndGet() & Integer.MAX_VALUE; - - if (StringUtils.isBlank(clientIp)) { - return new DefaultResponse(instances.get(pos % instances.size())); + if (StringUtils.isEmpty(clientIp)) { + return new DefaultResponse( + instances.get(ThreadLocalRandom.current().nextInt(instances.size()))); } String[] ipAndPort = clientIp.split(":"); if (ipAndPort.length != 2) { - throw new NoInstanceExistsException( - LinkisRpcErrorCodeSummary.INSTANCE_ERROR.getErrorCode(), - MessageFormat.format(LinkisRpcErrorCodeSummary.INSTANCE_ERROR.getErrorDesc(), clientIp)); + return new DefaultResponse( + instances.get(ThreadLocalRandom.current().nextInt(instances.size()))); } ServiceInstance chooseInstance = null; for (ServiceInstance instance : instances) { if (Objects.equals(ipAndPort[0], instance.getHost()) && Objects.equals(ipAndPort[1], String.valueOf(instance.getPort()))) { - chooseInstance = instance; - break; + return new DefaultResponse(instance); } } - if (null == chooseInstance) { - return null; - } else { - return new DefaultResponse(chooseInstance); - } + return new DefaultResponse( + instances.get(ThreadLocalRandom.current().nextInt(instances.size()))); } } diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/LinkisLoadBalancerClientConfiguration.java similarity index 77% rename from linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java rename to linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/LinkisLoadBalancerClientConfiguration.java index f4d501dfe4..5c0f07a8d3 100644 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/LinkisLoadBalancerClientConfiguration.java @@ -15,25 +15,21 @@ * limitations under the License. */ -package org.apache.linkis.rpc.loadbalancer; +package org.apache.linkis.gateway.springcloud.http; import org.springframework.cloud.client.ServiceInstance; -import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients; import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; -@Configuration -@LoadBalancerClients(defaultConfiguration = {LinkisLoadBalancerClientConfiguration.class}) public class LinkisLoadBalancerClientConfiguration { @Bean public ReactorLoadBalancer customLoadBalancer( Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); - return new ServiceInstancePriorityLoadBalancer( - loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name); + return new IpPriorityLoadBalancer( + name, loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class)); } }