Skip to content

Commit

Permalink
修复:重连认证失败;publicExecutor线程池阻塞,导致认证失败,新增privateExecutor线程池
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouhailin committed Dec 15, 2021
1 parent 346fa60 commit d9466ea
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected InboundClientOption newInboundClientOption() {
.rcvBufSize(properties.getRcvBufSize())
.workerGroupThread(properties.getWorkerGroupThread())
.publicExecutorThread(properties.getPublicExecutorThread())
.privateExecutorThread(properties.getPrivateExecutorThread())
.callbackExecutorThread(properties.getCallbackExecutorThread())
.defaultTimeoutSeconds(properties.getDefaultTimeoutSeconds())
.readTimeoutSeconds(properties.getReadTimeoutSeconds())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class InboundClientProperties {
private int rcvBufSize = 65535;
private int workerGroupThread = Runtime.getRuntime().availableProcessors() * 2;
private int publicExecutorThread = Runtime.getRuntime().availableProcessors() * 2;
private int privateExecutorThread = Runtime.getRuntime().availableProcessors() * 2;
private int callbackExecutorThread = Runtime.getRuntime().availableProcessors() * 2;
private int defaultTimeoutSeconds = 5;
private int readTimeoutSeconds = 30;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ abstract class AbstractNettyInboundClient implements ChannelEventListener, Inbou
final Bootstrap bootstrap;
final EventLoopGroup workerGroup;
final ExecutorService publicExecutor;
final ExecutorService privateExecutor;

final InboundClientOption option;

Expand All @@ -60,6 +61,8 @@ abstract class AbstractNettyInboundClient implements ChannelEventListener, Inbou

publicExecutor = new ScheduledThreadPoolExecutor(option.publicExecutorThread(),
new DefaultThreadFactory("publicExecutor", true));
privateExecutor = new ScheduledThreadPoolExecutor(option.privateExecutorThread(),
new DefaultThreadFactory("privateExecutor", true));

workerGroup = new NioEventLoopGroup(option.workerGroupThread());
bootstrap.group(workerGroup)
Expand All @@ -80,7 +83,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast("readTimeout", new ReadTimeoutHandler(option.readTimeoutSeconds()));
}
// now the inbound client logic
pipeline.addLast("clientHandler", new InboundChannelHandler(AbstractNettyInboundClient.this, publicExecutor, option.disablePublicExecutor()));
pipeline.addLast("clientHandler", new InboundChannelHandler(AbstractNettyInboundClient.this, publicExecutor, privateExecutor, option.disablePublicExecutor()));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class InboundChannelHandler extends SimpleChannelInboundHandler<EslMessag
private final Queue<SyncCallback> syncCallbacks = new ConcurrentLinkedQueue<>();
private final ChannelEventListener listener;
private final ExecutorService publicExecutor;
private final ExecutorService privateExecutor;
private final boolean disablePublicExecutor;
private Channel channel;
private String remoteAddr;
Expand All @@ -69,9 +70,10 @@ public class InboundChannelHandler extends SimpleChannelInboundHandler<EslMessag
* @param publicExecutor a {@link java.util.concurrent.ExecutorService} object.
* @param disablePublicExecutor a boolean.
*/
public InboundChannelHandler(ChannelEventListener listener, ExecutorService publicExecutor, boolean disablePublicExecutor) {
public InboundChannelHandler(ChannelEventListener listener, ExecutorService publicExecutor, ExecutorService privateExecutor, boolean disablePublicExecutor) {
this.listener = listener;
this.publicExecutor = publicExecutor;
this.privateExecutor = privateExecutor;
this.disablePublicExecutor = disablePublicExecutor;
}

Expand Down Expand Up @@ -151,7 +153,7 @@ private void handleEslMessage(EslMessage message) {
break;
case EslHeaders.Value.AUTH_REQUEST:
log.debug("Auth request received [{}]", message);
publicExecutor.execute(() -> listener.handleAuthRequest(remoteAddr, this));
privateExecutor.execute(() -> listener.handleAuthRequest(remoteAddr, this));
break;
case EslHeaders.Value.TEXT_DISCONNECT_NOTICE:
log.debug("Disconnect notice received [{}]", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class InboundClientOption {

private int workerGroupThread = Runtime.getRuntime().availableProcessors() * 2;
private int publicExecutorThread = Runtime.getRuntime().availableProcessors() * 2;
private int privateExecutorThread = Runtime.getRuntime().availableProcessors() * 2;
private int callbackExecutorThread = Runtime.getRuntime().availableProcessors() * 2;

private int defaultTimeoutSeconds = 5;
Expand Down Expand Up @@ -145,6 +146,26 @@ public InboundClientOption publicExecutorThread(int publicExecutorThread) {
return this;
}

/**
* <p>privateExecutorThread.</p>
*
* @return a int.
*/
public int privateExecutorThread() {
return privateExecutorThread;
}

/**
* <p>privateExecutorThread.</p>
*
* @param privateExecutorThread a int.
* @return a {@link link.thingscloud.freeswitch.esl.inbound.option.InboundClientOption} object.
*/
public InboundClientOption privateExecutorThread(int privateExecutorThread) {
this.privateExecutorThread = privateExecutorThread;
return this;
}

/**
* <p>callbackExecutorThread.</p>
*
Expand Down

1 comment on commit d9466ea

@thd555
Copy link

@thd555 thd555 commented on d9466ea Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

依然没解决publicExecutor线程池阻塞的问题呀

Please sign in to comment.