Skip to content

Commit

Permalink
update config of user customized biz threadPool && fix npe in async
Browse files Browse the repository at this point in the history
  • Loading branch information
YangruiEmma committed Aug 9, 2019
1 parent 551fe66 commit f451295
Show file tree
Hide file tree
Showing 30 changed files with 545 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
package com.meituan.dorado.common.thread;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

public class ExecutorUtil {

private static final long DEFAULT_THREAD_KEEP_ALIVE_TIME = 30L;

public static void shutdownExecutors(List<ExecutorService> executors, int timeoutMillis) {
if (executors == null) {
return;
Expand All @@ -43,4 +44,30 @@ public static void shutdownExecutor(ExecutorService executor, int timeoutMillis)
executor.shutdownNow();
}
}

public static ThreadPoolExecutor getThreadPool(int corePoolSize, int maximumPoolSize, int workQueueSize,
BlockingQueue<Runnable> workQueue, DefaultThreadFactory threadFactory) {
if (workQueue != null) {
return new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
DEFAULT_THREAD_KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
workQueue,
threadFactory);
} else if (workQueueSize > 0) {
return new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
DEFAULT_THREAD_KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(workQueueSize),
threadFactory);
} else {
return new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
DEFAULT_THREAD_KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class DefaultResponse implements Response {

private Long seq;

private String serviceName;

private Class<?> serviceInterface;

private byte statusCode;
Expand Down Expand Up @@ -78,6 +80,7 @@ private void initField(DefaultRequest request) {
throw new TimeoutException("Request has removed, cause Timeout happened earlier.");
}
this.seq = request.getSeq();
this.serviceName = request.getServiceName();
this.serviceInterface = request.getServiceInterface();
this.messageType = request.getMessageType();
this.doChecksum = request.getDoChecksum();
Expand Down Expand Up @@ -301,7 +304,6 @@ public void setThriftMsgInfo(ThriftMessageInfo thriftMsgInfo) {

@Override
public String toString() {
String serviceInterfaceName = serviceInterface == null ? "Unknown" : serviceInterface.getName();
return "Response(" + serviceInterfaceName + ")";
return "Response(" + serviceName + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import com.meituan.dorado.registry.RegistryFactory;
import com.meituan.dorado.rpc.handler.filter.Filter;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;

public class ProviderConfig {

private static final Logger LOGGER = LoggerFactory.getLogger(ProviderConfig.class);

protected String appkey;
// mns, zookeeper://address?k=v&k=v; 没配置则从SPI中获取
private String registry;
Expand All @@ -40,6 +45,11 @@ public class ProviderConfig {
private int port = Constants.DEFAULT_SERVER_PORT;
// 网络IO线程数
private int ioWorkerThreadCount = Constants.DEFAULT_IO_WORKER_THREAD_COUNT;
// 业务线程池配置1: 参数
private int bizCoreWorkerThreadCount = Constants.DEFAULT_BIZ_CORE_WORKER_THREAD_COUNT;
private int bizMaxWorkerThreadCount = Constants.DEFAULT_BIZ_MAX_WORKER_THREAD_COUNT;
private int bizWorkerQueueSize = Constants.DEFAULT_BIZ_WORKER_QUEUES;
private BlockingQueue<Runnable> threadPoolQueue;

// 单端口单服务
private ServiceConfig serviceConfig;
Expand All @@ -61,6 +71,7 @@ public class ProviderConfig {
private volatile boolean destroyed;

public void init() {
check();
if (StringUtils.isBlank(registry)) {
RegistryFactory registryFactory = ExtensionLoader.getExtension(RegistryFactory.class);
registry = registryFactory.getName();
Expand All @@ -76,6 +87,14 @@ public void init() {
ServicePublisher.publishService(this);
}

protected void check() {
if (bizCoreWorkerThreadCount > bizMaxWorkerThreadCount) {
throw new IllegalArgumentException("bizCoreWorkerThreadCount must less than bizMaxWorkerThreadCount");
} else if (bizCoreWorkerThreadCount == bizMaxWorkerThreadCount) {
LOGGER.warn("bizCoreWorkerThreadCount equals to bizMaxWorkerThreadCount, it may cause many idle thread kept in pool.");
}
}

public synchronized void destroy() {
if (destroyed) {
return;
Expand Down Expand Up @@ -199,6 +218,38 @@ public void setEnv(String env) {
this.env = env;
}

public int getBizCoreWorkerThreadCount() {
return bizCoreWorkerThreadCount;
}

public void setBizCoreWorkerThreadCount(int bizCoreWorkerThreadCount) {
this.bizCoreWorkerThreadCount = bizCoreWorkerThreadCount;
}

public int getBizMaxWorkerThreadCount() {
return bizMaxWorkerThreadCount;
}

public void setBizMaxWorkerThreadCount(int bizMaxWorkerThreadCount) {
this.bizMaxWorkerThreadCount = bizMaxWorkerThreadCount;
}

public int getBizWorkerQueueSize() {
return bizWorkerQueueSize;
}

public void setBizWorkerQueueSize(int bizWorkerQueueSize) {
this.bizWorkerQueueSize = bizWorkerQueueSize;
}

public BlockingQueue<Runnable> getThreadPoolQueue() {
return threadPoolQueue;
}

public void setThreadPoolQueue(BlockingQueue<Runnable> threadPoolQueue) {
this.threadPoolQueue = threadPoolQueue;
}

class ShutDownHook extends Thread {
private ProviderConfig config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ public class ServiceConfig<T> extends AbstractConfig {
// 接口实现 必配项
private T serviceImpl;

// 业务线程池配置1: 参数
private int bizCoreWorkerThreadCount = Constants.DEFAULT_BIZ_CORE_WORKER_THREAD_COUNT;
private int bizMaxWorkerThreadCount = Constants.DEFAULT_BIZ_MAX_WORKER_THREAD_COUNT;
private int bizWorkerQueueSize = Constants.DEFAULT_BIZ_WORKER_QUEUES;
// 业务线程池配置2: 线程池对象
private ExecutorService bizWorkerExecutor;
// 业务线程池配置3: 方法粒度线程池对象
Expand All @@ -52,11 +48,6 @@ protected void check() {
}
serviceInterface = interfaces[0];
}
if (bizCoreWorkerThreadCount > bizMaxWorkerThreadCount) {
throw new IllegalArgumentException("bizCoreWorkerThreadCount must less than bizMaxWorkerThreadCount");
} else if (bizCoreWorkerThreadCount == bizMaxWorkerThreadCount) {
LOGGER.warn("bizCoreWorkerThreadCount equals to bizMaxWorkerThreadCount, it may cause many idle thread keep in pool.");
}

serviceName = serviceInterface.getName();
if (serviceName.indexOf(Constants.LINK_SUB_CLASS_SYMBOL) > 0) {
Expand Down Expand Up @@ -86,30 +77,6 @@ public void setServiceImpl(T serviceImpl) {
this.serviceImpl = serviceImpl;
}

public int getBizCoreWorkerThreadCount() {
return bizCoreWorkerThreadCount;
}

public void setBizCoreWorkerThreadCount(int bizCoreWorkerThreadCount) {
this.bizCoreWorkerThreadCount = bizCoreWorkerThreadCount;
}

public int getBizMaxWorkerThreadCount() {
return bizMaxWorkerThreadCount;
}

public void setBizMaxWorkerThreadCount(int bizMaxWorkerThreadCount) {
this.bizMaxWorkerThreadCount = bizMaxWorkerThreadCount;
}

public int getBizWorkerQueueSize() {
return bizWorkerQueueSize;
}

public void setBizWorkerQueueSize(int bizWorkerQueueSize) {
this.bizWorkerQueueSize = bizWorkerQueueSize;
}

public ExecutorService getBizWorkerExecutor() {
return bizWorkerExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ public <T> ResponseFuture<T> asyncCall(Callable<T> callable) {
try {
try {
setAttachment(Constants.ASYNC, Boolean.TRUE);
final T o = callable.call();
if (o != null) {
logger.warn("Do async call but actual action is sync");
return new MockFuture(o);
}
callable.call();
} catch (Exception e) {
throw new RpcException(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void run() {
}
});
}
} else if (value != null) {
} else {
if (executor == null) {
callback.onComplete(value);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.meituan.dorado.trace.meta.TraceTimeline;
import com.meituan.dorado.transport.meta.Request;
import com.meituan.dorado.transport.meta.Response;
import com.meituan.dorado.util.MethodUtil;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -60,7 +61,9 @@ private Response handleAsync(Request request, ResponseFuture future) {
ServiceInvocationRepository.addTimeoutTask(request, future);
AsyncContext.getContext().setFuture(future);
Response response = buildResponse(request);
response.setResult(new RpcResult());
RpcResult result = new RpcResult();
result.setReturnVal(MethodUtil.getDefaultResult(request.getData().getMethod().getReturnType()));
response.setResult(result);
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.meituan.dorado.common.Constants;
import com.meituan.dorado.common.RpcRole;
import com.meituan.dorado.common.exception.ProtocolException;
import com.meituan.dorado.common.exception.RpcException;
import com.meituan.dorado.common.exception.ServiceException;
import com.meituan.dorado.common.extension.ExtensionLoader;
Expand Down Expand Up @@ -45,7 +44,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 每个端口服务的channel handler
Expand All @@ -64,7 +65,7 @@ public class ProviderChannelHandler implements ChannelHandler {
private final Map<String, Class<?>> serviceInterfaceMap = new HashMap<>();
private final Map<String, Object> serviceImplMap = new HashMap<>();

private final ExecutorService defaultExecutor;
private ThreadPoolExecutor defaultExecutor;

private final Map<String, ExecutorService> serviceExecutorMap = new HashMap<>();
private final Map<String, Map<String, ExecutorService>> methodExecutorMap = new HashMap<>();
Expand All @@ -75,18 +76,22 @@ public ProviderChannelHandler(ProviderConfig providerConfig) {
serviceImplMap.put(serviceConfig.getServiceName(), serviceConfig.getServiceImpl());
initThreadPoolExecutor(serviceConfig);
}

defaultExecutor = new ThreadPoolExecutor(Constants.DEFAULT_BIZ_CORE_WORKER_THREAD_COUNT,
Constants.DEFAULT_BIZ_MAX_WORKER_THREAD_COUNT,
Constants.IDLE_THREAD_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory(genServerBizThreadPoolName(providerConfig)));
initDefaultThreadPool(providerConfig);

FilterHandler actualHandler = buildActualInvokeHandler();
filterHandler = InvokeChainBuilder.initInvokeChain(actualHandler, providerConfig.getFilters(), RpcRole.PROVIDER);
}

private void initDefaultThreadPool(ProviderConfig providerConfig) {
DefaultThreadFactory threadFactory = new DefaultThreadFactory(genServerBizThreadPoolName(providerConfig));
defaultExecutor = ExecutorUtil.getThreadPool(providerConfig.getBizCoreWorkerThreadCount(),
providerConfig.getBizMaxWorkerThreadCount(),
providerConfig.getBizWorkerQueueSize(),
providerConfig.getThreadPoolQueue(),
threadFactory);
defaultExecutor.prestartAllCoreThreads();
}

private String genServerBizThreadPoolName(ProviderConfig providerConfig) {
String bizThreadPoolName = "DoradoServerBizWorker-";
if (providerConfig.getServiceConfigList().size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,31 @@ private static String parameterTypesToString(Class<?>[] parameterTypes) {
buf.append(")");
return buf.toString();
}

public static Object getDefaultResult(Class<?> returnType) {
if (returnType.isPrimitive()) {
String returnTypeName = returnType.getSimpleName();
if ("boolean".equals(returnTypeName)) {
return false;
} else if ("char".equals(returnTypeName)) {
return '0';
} else if ("byte".equals(returnTypeName)) {
return (byte) 0;
} else if ("short".equals(returnTypeName)) {
return (short) 0;
} else if ("int".equals(returnTypeName)) {
return 0;
} else if ("long".equals(returnTypeName)) {
return (long) 0;
} else if ("float".equals(returnTypeName)) {
return (float) 0;
} else if ("double".equals(returnTypeName)) {
return (double) 0;
} else {
return null;
}
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public Integer call() throws Exception {

Assert.assertNull(context.getAttachment(Constants.ASYNC));
try {
Assert.assertTrue(result.get() == 10);
Assert.assertTrue(result == null);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
package com.meituan.dorado.demo.thrift.apiway;

import com.meituan.dorado.bootstrap.ServiceBootstrap;
import com.meituan.dorado.config.service.ReferenceConfig;
import com.meituan.dorado.test.thrift.api.HelloService;
import org.apache.thrift.TException;
import org.junit.Assert;

public class ThriftConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.meituan.dorado.demo.thrift.apiway;

import com.meituan.dorado.bootstrap.ServiceBootstrap;
import com.meituan.dorado.config.service.ProviderConfig;
import com.meituan.dorado.config.service.ServiceConfig;
import com.meituan.dorado.test.thrift.api.HelloService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.meituan.dorado.demo.thrift.async;

import com.meituan.dorado.bootstrap.ServiceBootstrap;
import com.meituan.dorado.rpc.AsyncContext;
import com.meituan.dorado.rpc.ResponseCallback;
import com.meituan.dorado.rpc.ResponseFuture;
Expand Down
Loading

0 comments on commit f451295

Please sign in to comment.