Skip to content

Commit

Permalink
Support TLS/SSL for AdminServer
Browse files Browse the repository at this point in the history
  • Loading branch information
Pil0tXia committed Apr 17, 2024
1 parent 868170b commit b5fe68d
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 128 deletions.
Binary file added eventmesh-runtime/conf/eventmesh-admin-server.jks
Binary file not shown.
10 changes: 8 additions & 2 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ eventMesh.server.http.port=10105
eventMesh.server.grpc.port=10205
eventMesh.server.admin.http.port=10106

########################## EventMesh TCP Configuration ##########################
########################## EventMesh Network Configuration ##########################
eventMesh.server.tcp.readerIdleSeconds=120
eventMesh.server.tcp.writerIdleSeconds=120
eventMesh.server.tcp.allIdleSeconds=120
Expand Down Expand Up @@ -66,8 +66,14 @@ eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200
eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8

# HTTP Admin Server
########################## EventMesh HTTP Admin Configuration ##########################
eventMesh.server.admin.threads.num=2
eventMesh.server.admin.useTls.enabled=true
eventMesh.server.admin.ssl.protocol=TLSv1.3
eventMesh.server.admin.ssl.cer=eventmesh-admin-server.jks
eventMesh.server.admin.ssl.pass=eventmesh-admin-server
eventMesh.server.admin.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventMesh.server.admin.blacklist.ipv6=::/128,::1/128,ff00::/8

########################## EventMesh Plugin Configuration ##########################
# storage plugin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {

protected final transient AtomicBoolean started = new AtomicBoolean(false);

@Getter
private final transient boolean useTLS;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

import io.netty.channel.EventLoopGroup;
Expand All @@ -31,22 +30,32 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
* The most basic server
*/
@Slf4j
@Getter
public abstract class AbstractRemotingServer implements RemotingServer {

private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_SLEEP_SECONDS = 30;

@Setter
private EventLoopGroup bossGroup;

@Setter
private EventLoopGroup ioGroup;

@Setter
private EventExecutorGroup workerGroup;

protected ProducerManager producerManager;

@Setter
private int port;

protected void buildBossGroup(final String threadPrefix) {
Expand Down Expand Up @@ -75,10 +84,6 @@ protected void initProducerManager() throws Exception {
producerManager.init();
}

public ProducerManager getProducerManager() {
return producerManager;
}

public void init(final String threadPrefix) throws Exception {
buildBossGroup(threadPrefix);
buildIOGroup(threadPrefix);
Expand All @@ -94,16 +99,16 @@ public void shutdown() throws Exception {
bossGroup.shutdownGracefully();
log.info("shutdown bossGroup");
}
if (Objects.isNull(producerManager)) {
if (producerManager != null) {
producerManager.shutdown();
}

ThreadUtils.randomPause(TimeUnit.SECONDS.toMillis(DEFAULT_SLEEP_SECONDS));

if (ioGroup != null) {
ioGroup.shutdownGracefully();
log.info("shutdown ioGroup");
}

if (workerGroup != null) {
workerGroup.shutdownGracefully();

Expand All @@ -114,36 +119,4 @@ public void shutdown() throws Exception {
protected boolean useEpoll() {
return SystemUtils.isLinuxPlatform() && Epoll.isAvailable();
}

public EventLoopGroup getBossGroup() {
return bossGroup;
}

public void setBossGroup(final EventLoopGroup bossGroup) {
this.bossGroup = bossGroup;
}

public EventLoopGroup getIoGroup() {
return ioGroup;
}

public void setIoGroup(final EventLoopGroup ioGroup) {
this.ioGroup = ioGroup;
}

public EventExecutorGroup getWorkerGroup() {
return workerGroup;
}

public void setWorkerGroup(final EventExecutorGroup workerGroup) {
this.workerGroup = workerGroup;
}

public int getPort() {
return port;
}

public void setPort(final int port) {
this.port = port;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -47,6 +50,7 @@
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslHandler;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -61,7 +65,7 @@ public class EventMeshAdminServer extends AbstractHTTPServer {
private AdminHandlerManager adminHandlerManager;

@Getter
private ThreadPoolExecutor adminExecutor;
private ThreadPoolExecutor adminMetricsExecutor;

public EventMeshAdminServer(final EventMeshServer eventMeshServer, final EventMeshAdminConfiguration eventMeshAdminConfiguration) {
super(eventMeshAdminConfiguration.getEventMeshServerAdminPort(),
Expand All @@ -86,8 +90,9 @@ public void start() throws Exception {
try {
bootstrap.group(this.getBossGroup(), this.getIoGroup())
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new AdminServerInitializer())
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
.childHandler(new AdminServerInitializer(
this.isUseTLS() ? SSLContextFactory.getSslContext(eventMeshAdminConfiguration) : null, this.isUseTLS()))
.childOption(ChannelOption.AUTO_CLOSE, Boolean.TRUE);

log.info("AdminHttpServer[port={}] started.", this.getPort());

Expand All @@ -110,24 +115,38 @@ public void start() throws Exception {
started.compareAndSet(false, true);
}

private void registerAdminRequestProcessor() {
final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this);
registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor);
}

private void initThreadPool() {
adminExecutor = ThreadPoolFactory.createThreadPoolExecutor(
adminMetricsExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshAdminConfiguration.getEventMeshServerAdminThreadNum(),
eventMeshAdminConfiguration.getEventMeshServerAdminThreadNum(),
new LinkedBlockingQueue<>(50), "eventMesh-runtime-admin", true);
new LinkedBlockingQueue<>(50), "eventMesh-admin-metrics", true);
}

private void registerAdminRequestProcessor() {
final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this);
registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor);
}

private class AdminServerInitializer extends ChannelInitializer<SocketChannel> {

private final transient SSLContext sslContext;
private final transient boolean useTLS;

public AdminServerInitializer(final SSLContext sslContext, final boolean useTLS) {
this.sslContext = sslContext;
this.useTLS = useTLS;
}

@Override
protected void initChannel(final SocketChannel channel) {
final ChannelPipeline pipeline = channel.pipeline();

if (sslContext != null && useTLS) {
final SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);
pipeline.addFirst(getWorkerGroup(), "ssl", new SslHandler(sslEngine));
}

pipeline.addLast(getWorkerGroup(),
new HttpRequestDecoder(),
new HttpResponseEncoder(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.RateLimiter;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;


/**
* Add multiple managers to the underlying server
*/
@Slf4j
@Getter
public class EventMeshHTTPServer extends AbstractHTTPServer {

private final EventMeshServer eventMeshServer;
Expand Down Expand Up @@ -293,62 +295,4 @@ private void registerWebhook() throws Exception {

this.getHandlerService().register(webHookProcessor, super.getHttpThreadPoolGroup().getWebhookExecutor());
}

public SubscriptionManager getSubscriptionManager() {
return subscriptionManager;
}

public ConsumerManager getConsumerManager() {
return consumerManager;
}

public ProducerManager getProducerManager() {
return producerManager;
}

public EventMeshHTTPConfiguration getEventMeshHttpConfiguration() {
return eventMeshHttpConfiguration;
}

public EventBus getEventBus() {
return eventBus;
}

public HttpRetryer getHttpRetryer() {
return httpRetryer;
}

public Acl getAcl() {
return acl;
}

public EventMeshServer getEventMeshServer() {
return eventMeshServer;
}

public RateLimiter getMsgRateLimiter() {
return msgRateLimiter;
}

public RateLimiter getBatchRateLimiter() {
return batchRateLimiter;
}

public FilterEngine getFilterEngine() {
return filterEngine;
}

public TransformerEngine getTransformerEngine() {
return transformerEngine;
}

public MetaStorage getMetaStorage() {
return metaStorage;
}

public HTTPClientPool getHttpClientPool() {
return httpClientPool;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ public void shutdown() throws Exception {
}
producerTopicManager.shutdown();
ConfigurationContextUtil.clear();
serviceState = ServiceState.STOPPED;

serviceState = ServiceState.STOPPED;
log.info(SERVER_STATE_MSG, serviceState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,16 @@

public class SSLContextFactory {

private static String protocol = "TLSv1.1";

private static String fileName;

private static String password;

public static SSLContext getSslContext(final EventMeshHTTPConfiguration eventMeshHttpConfiguration)
throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException,
UnrecoverableKeyException, KeyManagementException {
throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException {

String protocol = eventMeshHttpConfiguration.getEventMeshServerSSLProtocol();
String fileName = eventMeshHttpConfiguration.getEventMeshServerSSLCer();
String password = eventMeshHttpConfiguration.getEventMeshServerSSLPass();
SSLContext sslContext;

try (InputStream inputStream = Files.newInputStream(Paths.get(EventMeshConstants.EVENTMESH_CONF_HOME
+ File.separator
+ fileName), StandardOpenOption.READ)) {
protocol = eventMeshHttpConfiguration.getEventMeshServerSSLProtocol();
fileName = eventMeshHttpConfiguration.getEventMeshServerSSLCer();
password = eventMeshHttpConfiguration.getEventMeshServerSSLPass();
try (InputStream inputStream = Files.newInputStream(Paths.get(EventMeshConstants.EVENTMESH_CONF_HOME + File.separator + fileName),
StandardOpenOption.READ)) {

char[] filePass = StringUtils.isNotBlank(password) ? password.toCharArray() : new char[0];
final KeyStore keyStore = KeyStore.getInstance("JKS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ public class EventMeshAdminConfiguration extends EventMeshHTTPConfiguration {
private boolean eventMeshServerUseTls = false;

@ConfigField(field = "admin.ssl.protocol")
private String eventMeshServerSSLProtocol = "TLSv1.1";
private String eventMeshServerSSLProtocol = "TLSv1.3";

@ConfigField(field = "admin.ssl.cer")
private String eventMeshServerSSLCer = "sChat2.jks";
private String eventMeshServerSSLCer = "eventmesh-admin-server.jks";

@ConfigField(field = "admin.ssl.pass")
private String eventMeshServerSSLPass = "sNetty";
private String eventMeshServerSSLPass = "eventmesh-admin-server";

@ConfigField(field = "admin.blacklist.ipv4")
private List<IPAddress> eventMeshIpv4BlackList = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>

@Override
public Executor executor() {
return eventMeshAdminServer.getAdminExecutor();
return eventMeshAdminServer.getAdminMetricsExecutor();
}
}

0 comments on commit b5fe68d

Please sign in to comment.