diff --git a/eventmesh-runtime/conf/eventmesh-admin-server.jks b/eventmesh-runtime/conf/eventmesh-admin-server.jks new file mode 100644 index 0000000000..92deb897a4 Binary files /dev/null and b/eventmesh-runtime/conf/eventmesh-admin-server.jks differ diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 280f7821f0..fd9e7964fe 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -26,7 +26,7 @@ eventMesh.server.http.port=10105 eventMesh.server.grpc.port=10205 eventMesh.server.admin.http.port=10106 -########################## EventMesh TCP Configuration ########################## +########################## EventMesh Network Configuration ########################## eventMesh.server.tcp.readerIdleSeconds=120 eventMesh.server.tcp.writerIdleSeconds=120 eventMesh.server.tcp.allIdleSeconds=120 @@ -66,8 +66,14 @@ eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200 eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32 eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8 -# HTTP Admin Server +########################## EventMesh HTTP Admin Configuration ########################## eventMesh.server.admin.threads.num=2 +eventMesh.server.admin.useTls.enabled=true +eventMesh.server.admin.ssl.protocol=TLSv1.3 +eventMesh.server.admin.ssl.cer=eventmesh-admin-server.jks +eventMesh.server.admin.ssl.pass=eventmesh-admin-server +eventMesh.server.admin.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32 +eventMesh.server.admin.blacklist.ipv6=::/128,::1/128,ff00::/8 ########################## EventMesh Plugin Configuration ########################## # storage plugin diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index 5e887e3359..3a3fbec280 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -110,6 +110,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer { protected final transient AtomicBoolean started = new AtomicBoolean(false); + @Getter private final transient boolean useTLS; @Getter diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java index fb09116620..e02637ec39 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java @@ -22,7 +22,6 @@ import org.apache.eventmesh.common.utils.ThreadUtils; import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager; -import java.util.Objects; import java.util.concurrent.TimeUnit; import io.netty.channel.EventLoopGroup; @@ -31,22 +30,32 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.EventExecutorGroup; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** * The most basic server */ @Slf4j +@Getter public abstract class AbstractRemotingServer implements RemotingServer { private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors(); private static final int DEFAULT_SLEEP_SECONDS = 30; + @Setter private EventLoopGroup bossGroup; + + @Setter private EventLoopGroup ioGroup; + + @Setter private EventExecutorGroup workerGroup; + protected ProducerManager producerManager; + @Setter private int port; protected void buildBossGroup(final String threadPrefix) { @@ -75,10 +84,6 @@ protected void initProducerManager() throws Exception { producerManager.init(); } - public ProducerManager getProducerManager() { - return producerManager; - } - public void init(final String threadPrefix) throws Exception { buildBossGroup(threadPrefix); buildIOGroup(threadPrefix); @@ -94,16 +99,16 @@ public void shutdown() throws Exception { bossGroup.shutdownGracefully(); log.info("shutdown bossGroup"); } - if (Objects.isNull(producerManager)) { + if (producerManager != null) { producerManager.shutdown(); } + ThreadUtils.randomPause(TimeUnit.SECONDS.toMillis(DEFAULT_SLEEP_SECONDS)); if (ioGroup != null) { ioGroup.shutdownGracefully(); log.info("shutdown ioGroup"); } - if (workerGroup != null) { workerGroup.shutdownGracefully(); @@ -114,36 +119,4 @@ public void shutdown() throws Exception { protected boolean useEpoll() { return SystemUtils.isLinuxPlatform() && Epoll.isAvailable(); } - - public EventLoopGroup getBossGroup() { - return bossGroup; - } - - public void setBossGroup(final EventLoopGroup bossGroup) { - this.bossGroup = bossGroup; - } - - public EventLoopGroup getIoGroup() { - return ioGroup; - } - - public void setIoGroup(final EventLoopGroup ioGroup) { - this.ioGroup = ioGroup; - } - - public EventExecutorGroup getWorkerGroup() { - return workerGroup; - } - - public void setWorkerGroup(final EventExecutorGroup workerGroup) { - this.workerGroup = workerGroup; - } - - public int getPort() { - return port; - } - - public void setPort(final int port) { - this.port = port; - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java index 549cba0595..5e98fc690b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java @@ -31,6 +31,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -47,6 +50,7 @@ import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.ssl.SslHandler; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -61,7 +65,7 @@ public class EventMeshAdminServer extends AbstractHTTPServer { private AdminHandlerManager adminHandlerManager; @Getter - private ThreadPoolExecutor adminExecutor; + private ThreadPoolExecutor adminMetricsExecutor; public EventMeshAdminServer(final EventMeshServer eventMeshServer, final EventMeshAdminConfiguration eventMeshAdminConfiguration) { super(eventMeshAdminConfiguration.getEventMeshServerAdminPort(), @@ -86,8 +90,9 @@ public void start() throws Exception { try { bootstrap.group(this.getBossGroup(), this.getIoGroup()) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) - .childHandler(new AdminServerInitializer()) - .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); + .childHandler(new AdminServerInitializer( + this.isUseTLS() ? SSLContextFactory.getSslContext(eventMeshAdminConfiguration) : null, this.isUseTLS())) + .childOption(ChannelOption.AUTO_CLOSE, Boolean.TRUE); log.info("AdminHttpServer[port={}] started.", this.getPort()); @@ -110,24 +115,38 @@ public void start() throws Exception { started.compareAndSet(false, true); } - private void registerAdminRequestProcessor() { - final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this); - registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor); - } - private void initThreadPool() { - adminExecutor = ThreadPoolFactory.createThreadPoolExecutor( + adminMetricsExecutor = ThreadPoolFactory.createThreadPoolExecutor( eventMeshAdminConfiguration.getEventMeshServerAdminThreadNum(), eventMeshAdminConfiguration.getEventMeshServerAdminThreadNum(), - new LinkedBlockingQueue<>(50), "eventMesh-runtime-admin", true); + new LinkedBlockingQueue<>(50), "eventMesh-admin-metrics", true); + } + + private void registerAdminRequestProcessor() { + final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this); + registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor); } private class AdminServerInitializer extends ChannelInitializer { + private final transient SSLContext sslContext; + private final transient boolean useTLS; + + public AdminServerInitializer(final SSLContext sslContext, final boolean useTLS) { + this.sslContext = sslContext; + this.useTLS = useTLS; + } + @Override protected void initChannel(final SocketChannel channel) { final ChannelPipeline pipeline = channel.pipeline(); + if (sslContext != null && useTLS) { + final SSLEngine sslEngine = sslContext.createSSLEngine(); + sslEngine.setUseClientMode(false); + pipeline.addFirst(getWorkerGroup(), "ssl", new SslHandler(sslEngine)); + } + pipeline.addLast(getWorkerGroup(), new HttpRequestDecoder(), new HttpResponseEncoder(), diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index 18e294cfda..e246549940 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -67,6 +67,7 @@ import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.RateLimiter; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -74,6 +75,7 @@ * Add multiple managers to the underlying server */ @Slf4j +@Getter public class EventMeshHTTPServer extends AbstractHTTPServer { private final EventMeshServer eventMeshServer; @@ -293,62 +295,4 @@ private void registerWebhook() throws Exception { this.getHandlerService().register(webHookProcessor, super.getHttpThreadPoolGroup().getWebhookExecutor()); } - - public SubscriptionManager getSubscriptionManager() { - return subscriptionManager; - } - - public ConsumerManager getConsumerManager() { - return consumerManager; - } - - public ProducerManager getProducerManager() { - return producerManager; - } - - public EventMeshHTTPConfiguration getEventMeshHttpConfiguration() { - return eventMeshHttpConfiguration; - } - - public EventBus getEventBus() { - return eventBus; - } - - public HttpRetryer getHttpRetryer() { - return httpRetryer; - } - - public Acl getAcl() { - return acl; - } - - public EventMeshServer getEventMeshServer() { - return eventMeshServer; - } - - public RateLimiter getMsgRateLimiter() { - return msgRateLimiter; - } - - public RateLimiter getBatchRateLimiter() { - return batchRateLimiter; - } - - public FilterEngine getFilterEngine() { - return filterEngine; - } - - public TransformerEngine getTransformerEngine() { - return transformerEngine; - } - - public MetaStorage getMetaStorage() { - return metaStorage; - } - - public HTTPClientPool getHttpClientPool() { - return httpClientPool; - } - - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java index 6bf292e830..bdf0e857fb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java @@ -202,8 +202,8 @@ public void shutdown() throws Exception { } producerTopicManager.shutdown(); ConfigurationContextUtil.clear(); - serviceState = ServiceState.STOPPED; + serviceState = ServiceState.STOPPED; log.info(SERVER_STATE_MSG, serviceState); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/SSLContextFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/SSLContextFactory.java index 0f48220a4d..46803d33d7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/SSLContextFactory.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/SSLContextFactory.java @@ -40,23 +40,16 @@ public class SSLContextFactory { - private static String protocol = "TLSv1.1"; - - private static String fileName; - - private static String password; - public static SSLContext getSslContext(final EventMeshHTTPConfiguration eventMeshHttpConfiguration) - throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, - UnrecoverableKeyException, KeyManagementException { + throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException { + + String protocol = eventMeshHttpConfiguration.getEventMeshServerSSLProtocol(); + String fileName = eventMeshHttpConfiguration.getEventMeshServerSSLCer(); + String password = eventMeshHttpConfiguration.getEventMeshServerSSLPass(); SSLContext sslContext; - try (InputStream inputStream = Files.newInputStream(Paths.get(EventMeshConstants.EVENTMESH_CONF_HOME - + File.separator - + fileName), StandardOpenOption.READ)) { - protocol = eventMeshHttpConfiguration.getEventMeshServerSSLProtocol(); - fileName = eventMeshHttpConfiguration.getEventMeshServerSSLCer(); - password = eventMeshHttpConfiguration.getEventMeshServerSSLPass(); + try (InputStream inputStream = Files.newInputStream(Paths.get(EventMeshConstants.EVENTMESH_CONF_HOME + File.separator + fileName), + StandardOpenOption.READ)) { char[] filePass = StringUtils.isNotBlank(password) ? password.toCharArray() : new char[0]; final KeyStore keyStore = KeyStore.getInstance("JKS"); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshAdminConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshAdminConfiguration.java index 210b48da33..0a65209ce1 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshAdminConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshAdminConfiguration.java @@ -45,13 +45,13 @@ public class EventMeshAdminConfiguration extends EventMeshHTTPConfiguration { private boolean eventMeshServerUseTls = false; @ConfigField(field = "admin.ssl.protocol") - private String eventMeshServerSSLProtocol = "TLSv1.1"; + private String eventMeshServerSSLProtocol = "TLSv1.3"; @ConfigField(field = "admin.ssl.cer") - private String eventMeshServerSSLCer = "sChat2.jks"; + private String eventMeshServerSSLCer = "eventmesh-admin-server.jks"; @ConfigField(field = "admin.ssl.pass") - private String eventMeshServerSSLPass = "sNetty"; + private String eventMeshServerSSLPass = "eventmesh-admin-server"; @ConfigField(field = "admin.blacklist.ipv4") private List eventMeshIpv4BlackList = Collections.emptyList(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java index a0ce9e92dc..9a8b369341 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java @@ -39,6 +39,6 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext @Override public Executor executor() { - return eventMeshAdminServer.getAdminExecutor(); + return eventMeshAdminServer.getAdminMetricsExecutor(); } }