Skip to content

Commit

Permalink
Refactor AdminServer to own independent configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Pil0tXia committed Apr 17, 2024
1 parent c4675e5 commit 868170b
Show file tree
Hide file tree
Showing 18 changed files with 191 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ public class Constants {

public static final String GRPC = "GRPC";

public static final String ADMIN = "ADMIN";

public static final String OS_NAME_KEY = "os.name";

public static final String OS_WIN_PREFIX = "win";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.common.utils;

import static org.apache.eventmesh.common.Constants.ADMIN;
import static org.apache.eventmesh.common.Constants.GRPC;
import static org.apache.eventmesh.common.Constants.HTTP;
import static org.apache.eventmesh.common.Constants.TCP;
Expand All @@ -36,7 +37,7 @@ public class ConfigurationContextUtil {

private static final ConcurrentHashMap<String, CommonConfiguration> CONFIGURATION_MAP = new ConcurrentHashMap<>();

public static final List<String> KEYS = Lists.newArrayList(HTTP, TCP, GRPC);
public static final List<String> KEYS = Lists.newArrayList(HTTP, TCP, GRPC, ADMIN);

/**
* Save http, tcp, grpc configuration at startup for global use.
Expand Down
4 changes: 3 additions & 1 deletion eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ eventMesh.sysid=0000
eventMesh.server.tcp.port=10000
eventMesh.server.http.port=10105
eventMesh.server.grpc.port=10205
# HTTP Admin Server
eventMesh.server.admin.http.port=10106

########################## EventMesh TCP Configuration ##########################
Expand Down Expand Up @@ -67,6 +66,9 @@ eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200
eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8

# HTTP Admin Server
eventMesh.server.admin.threads.num=2

########################## EventMesh Plugin Configuration ##########################
# storage plugin
eventMesh.storage.plugin.type=standalone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
import io.netty.util.ReferenceCountUtil;
import io.opentelemetry.api.trace.Span;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -96,6 +98,8 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {

private final transient EventMeshHTTPConfiguration eventMeshHttpConfiguration;

@Getter
@Setter
private HTTPMetricsServer metrics;

private static final DefaultHttpDataFactory DEFAULT_HTTP_DATA_FACTORY = new DefaultHttpDataFactory(false);
Expand All @@ -105,8 +109,13 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
}

protected final transient AtomicBoolean started = new AtomicBoolean(false);

private final transient boolean useTLS;

@Getter
@Setter
private Boolean useTrace = false; // Determine whether trace is enabled

private static final int MAX_CONNECTIONS = 20_000;

/**
Expand All @@ -118,10 +127,13 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
private HttpConnectionHandler httpConnectionHandler;
private HttpDispatcher httpDispatcher;

@Setter
@Getter
private HandlerService handlerService;
private final transient ThreadPoolExecutor asyncContextCompleteHandler =
ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext");

@Getter
private final HTTPThreadPoolGroup httpThreadPoolGroup;

public AbstractHTTPServer(final int port, final boolean useTLS,
Expand Down Expand Up @@ -524,32 +536,4 @@ protected void initChannel(final SocketChannel channel) {
httpDispatcher);
}
}

public void setUseTrace(final Boolean useTrace) {
this.useTrace = useTrace;
}

public Boolean getUseTrace() {
return useTrace;
}

public HTTPMetricsServer getMetrics() {
return metrics;
}

public void setMetrics(final HTTPMetricsServer metrics) {
this.metrics = metrics;
}

public HTTPThreadPoolGroup getHttpThreadPoolGroup() {
return httpThreadPoolGroup;
}

public HandlerService getHandlerService() {
return handlerService;
}

public void setHandlerService(HandlerService handlerService) {
this.handlerService = handlerService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,45 @@

package org.apache.eventmesh.runtime.boot;

import static org.apache.eventmesh.common.Constants.ADMIN;

import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.runtime.configuration.EventMeshAdminConfiguration;

import lombok.Getter;

public class EventMeshAdminBootstrap implements EventMeshBootstrap {

@Getter
private EventMeshAdminServer eventMeshAdminServer;

private EventMeshServer eventMeshServer;
private final EventMeshAdminConfiguration eventMeshAdminConfiguration;

private final EventMeshServer eventMeshServer;

public EventMeshAdminBootstrap(EventMeshServer eventMeshServer) {
this.eventMeshServer = eventMeshServer;

ConfigService configService = ConfigService.getInstance();
this.eventMeshAdminConfiguration = configService.buildConfigInstance(EventMeshAdminConfiguration.class);

ConfigurationContextUtil.putIfAbsent(ADMIN, eventMeshAdminConfiguration);
}

@Override
public void init() throws Exception {
if (eventMeshServer != null) {
eventMeshAdminServer = new EventMeshAdminServer(eventMeshServer);
eventMeshAdminServer = new EventMeshAdminServer(eventMeshServer, eventMeshAdminConfiguration);
eventMeshAdminServer.init();
}

}

@Override
public void start() throws Exception {
if (eventMeshAdminServer != null) {
eventMeshAdminServer.start();
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@

package org.apache.eventmesh.runtime.boot;

import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.runtime.admin.handler.AdminHandlerManager;
import org.apache.eventmesh.runtime.admin.handler.HttpHandler;
import org.apache.eventmesh.runtime.configuration.EventMeshAdminConfiguration;
import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor;
import org.apache.eventmesh.runtime.util.HttpResponseUtils;

import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -42,25 +48,35 @@
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class EventMeshAdminServer extends AbstractHTTPServer {

private final EventMeshAdminConfiguration eventMeshAdminConfiguration;

private HttpConnectionHandler httpConnectionHandler = new HttpConnectionHandler();

private AdminHandlerManager adminHandlerManager;

public EventMeshAdminServer(EventMeshServer eventMeshServer) {
super(eventMeshServer.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshServerAdminPort(),
false, eventMeshServer.getEventMeshHTTPServer().getEventMeshHttpConfiguration());
@Getter
private ThreadPoolExecutor adminExecutor;

public EventMeshAdminServer(final EventMeshServer eventMeshServer, final EventMeshAdminConfiguration eventMeshAdminConfiguration) {
super(eventMeshAdminConfiguration.getEventMeshServerAdminPort(),
eventMeshAdminConfiguration.isEventMeshServerUseTls(),
eventMeshAdminConfiguration);
this.eventMeshAdminConfiguration = eventMeshAdminConfiguration;
adminHandlerManager = new AdminHandlerManager(eventMeshServer);
}

@Override
public void init() throws Exception {
super.init("eventMesh-admin");
initThreadPool();
adminHandlerManager.registerHttpHandler();
registerAdminRequestProcessor();
}

@Override
Expand Down Expand Up @@ -94,21 +110,16 @@ public void start() throws Exception {
started.compareAndSet(false, true);
}

public void parseHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest) {
String uriStr = httpRequest.uri();
URI uri = URI.create(uriStr);
Optional<HttpHandler> httpHandlerOpt = adminHandlerManager.getHttpHandler(uri.getPath());
if (httpHandlerOpt.isPresent()) {
try {
httpHandlerOpt.get().handle(httpRequest, ctx);
} catch (Exception e) {
log.error("admin server channelRead error", e);
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx,
HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
}
} else {
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
}
private void registerAdminRequestProcessor() {
final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this);
registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor);
}

private void initThreadPool() {
adminExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshAdminConfiguration.getEventMeshServerAdminThreadNum(),
eventMeshAdminConfiguration.getEventMeshServerAdminThreadNum(),
new LinkedBlockingQueue<>(50), "eventMesh-runtime-admin", true);
}

private class AdminServerInitializer extends ChannelInitializer<SocketChannel> {
Expand All @@ -130,5 +141,22 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
}
});
}

private void parseHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest) {
String uriStr = httpRequest.uri();
URI uri = URI.create(uriStr);
Optional<HttpHandler> httpHandlerOpt = adminHandlerManager.getHttpHandler(uri.getPath());
if (httpHandlerOpt.isPresent()) {
try {
httpHandlerOpt.get().handle(httpRequest, ctx);
} catch (Exception e) {
log.error("admin server channelRead error", e);
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx,
HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
}
} else {
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;

import lombok.Getter;

public class EventMeshGrpcBootstrap implements EventMeshBootstrap {

private final EventMeshGrpcConfiguration eventMeshGrpcConfiguration;

@Getter
private EventMeshGrpcServer eventMeshGrpcServer;

private final EventMeshServer eventMeshServer;
Expand Down Expand Up @@ -62,12 +65,4 @@ public void shutdown() throws Exception {
eventMeshGrpcServer.shutdown();
}
}

public EventMeshGrpcServer getEventMeshGrpcServer() {
return eventMeshGrpcServer;
}

public void setEventMeshGrpcServer(EventMeshGrpcServer eventMeshGrpcServer) {
this.eventMeshGrpcServer = eventMeshGrpcServer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.CreateTopicProcessor;
Expand Down Expand Up @@ -95,15 +94,13 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
private transient RateLimiter batchRateLimiter;

public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {

super(eventMeshHttpConfiguration.getHttpServerPort(),
eventMeshHttpConfiguration.isEventMeshServerUseTls(),
eventMeshHttpConfiguration);
this.eventMeshServer = eventMeshServer;
this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
this.metaStorage = eventMeshServer.getMetaStorage();
this.acl = eventMeshServer.getAcl();

}

public void init() throws Exception {
Expand Down Expand Up @@ -251,9 +248,6 @@ private void registerHTTPRequestProcessor() throws Exception {
final SendAsyncRemoteEventProcessor sendAsyncRemoteEventProcessor = new SendAsyncRemoteEventProcessor(this);
this.getHandlerService().register(sendAsyncRemoteEventProcessor);

final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this);
registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor);

final HeartBeatProcessor heartProcessor = new HeartBeatProcessor(this);
registerProcessor(RequestCode.HEARTBEAT.getRequestCode(), heartProcessor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;

public class EventMeshHttpBootstrap implements EventMeshBootstrap {
import lombok.Getter;

private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;
public class EventMeshHttpBootstrap implements EventMeshBootstrap {

@Getter
public EventMeshHTTPServer eventMeshHttpServer;

private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;

private final EventMeshServer eventMeshServer;

public EventMeshHttpBootstrap(final EventMeshServer eventMeshServer) {
Expand Down Expand Up @@ -64,12 +67,4 @@ public void shutdown() throws Exception {
eventMeshHttpServer.shutdown();
}
}

public EventMeshHTTPServer getEventMeshHttpServer() {
return eventMeshHttpServer;
}

public void setEventMeshHttpServer(EventMeshHTTPServer eventMeshHttpServer) {
this.eventMeshHttpServer = eventMeshHttpServer;
}
}
Loading

0 comments on commit 868170b

Please sign in to comment.