Skip to content

Commit

Permalink
[ISSUES #4933]Add Admin Module
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN authored May 29, 2024
1 parent 78942c4 commit aa8f604
Show file tree
Hide file tree
Showing 71 changed files with 1,841 additions and 691 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.apache.eventmesh.admin.server;

import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.utils.PagedList;

import com.apache.eventmesh.admin.server.task.Task;

public interface Admin extends ComponentLifeCycle{
public interface Admin extends ComponentLifeCycle {
/**
* support for web or ops
**/
Expand All @@ -17,8 +17,5 @@ public interface Admin extends ComponentLifeCycle{
/**
* support for task
*/
void reportHeartbeat(HeartBeat heartBeat);



void reportHeartbeat(ReportHeartBeatRequest heartBeat);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,18 +1,50 @@
package com.apache.eventmesh.admin.server;

import com.apache.eventmesh.admin.server.task.Task;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.PagedList;
import org.apache.eventmesh.registry.RegisterServerInfo;
import org.apache.eventmesh.registry.RegistryFactory;
import org.apache.eventmesh.registry.RegistryService;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;

public class AdminServer implements Admin {
import javax.annotation.PostConstruct;

private RegistryService registryService;
@Service
@Slf4j
public class AdminServer implements Admin, ApplicationListener<ApplicationReadyEvent> {

// private EventMeshAdminServerRegisterInfo registerInfo;
private final RegistryService registryService;

public AdminServer(RegistryService registryService) {
this.registryService = registryService;
// this.registerInfo = registerInfo;
private final RegisterServerInfo adminServeInfo;

private final CommonConfiguration configuration;

public AdminServer(AdminServerProperties properties) {
configuration =
ConfigService.getInstance().buildConfigInstance(CommonConfiguration.class);
if (configuration == null) {
throw new AdminServerRuntimeException(ErrorCode.STARTUP_CONFIG_MISS, "common configuration file miss");
}
this.adminServeInfo = new RegisterServerInfo();

adminServeInfo.setHealth(true);
adminServeInfo.setAddress(IPUtils.getLocalAddress() + ":" + properties.getPort());
String name = Constants.ADMIN_SERVER_REGISTRY_NAME;
if (StringUtils.isNotBlank(properties.getServiceName())) {
name = properties.getServiceName();
}
adminServeInfo.setServiceName(name);
registryService = RegistryFactory.getInstance(configuration.getEventMeshRegistryPluginType());
}


Expand All @@ -37,18 +69,35 @@ public PagedList<Task> getTaskPaged(Task task) {
}

@Override
public void reportHeartbeat(HeartBeat heartBeat) {
public void reportHeartbeat(ReportHeartBeatRequest heartBeat) {

}

@Override
@PostConstruct
public void start() {
registryService.register(null);
if (configuration.isEventMeshRegistryPluginEnabled()) {
registryService.init();
}
}

@Override
public void destroy() {
registryService.unRegister(null);
registryService.shutdown();
if (configuration.isEventMeshRegistryPluginEnabled()) {
registryService.unRegister(adminServeInfo);
try {
Thread.sleep(3000);
} catch (InterruptedException ignore) {
}
registryService.shutdown();
}
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
if (configuration.isEventMeshRegistryPluginEnabled()) {
log.info("application is started and registry plugin is enabled, it's will register admin self");
registryService.register(adminServeInfo);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.apache.eventmesh.admin.server;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("event-mesh.admin-server")
@Getter
@Setter
public class AdminServerProperties {
private int port;
private boolean enableSSL;
private String configurationPath;
private String configurationFile;
private String serviceName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.apache.eventmesh.admin.server;

import lombok.Getter;

public class AdminServerRuntimeException extends RuntimeException {
@Getter
private final int code;
public AdminServerRuntimeException(int code, String message) {
super(message);
this.code = code;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.apache.eventmesh.admin.server;

public interface ComponentLifeCycle {
void start();
void start() throws Exception;
void destroy();
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.apache.eventmesh.admin.server;

import com.apache.eventmesh.admin.server.constatns.AdminServerConstants;
import org.apache.eventmesh.common.config.ConfigService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ExampleAdminServer {
public static void main(String[] args) throws Exception {
ConfigService.getInstance().setConfigPath(AdminServerConstants.EVENTMESH_CONF_HOME).setRootConfig(AdminServerConstants.EVENTMESH_CONF_FILE);
SpringApplication.run(ExampleAdminServer.class);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.apache.eventmesh.admin.server.constatns;

public class AdminServerConstants {
public static final String CONF_ENV = "configurationPath";

public static final String EVENTMESH_CONF_HOME = System.getProperty(CONF_ENV, System.getenv(CONF_ENV));

public static final String EVENTMESH_CONF_FILE = "eventmesh-admin.properties";
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.apache.eventmesh.admin.server.web;

import com.apache.eventmesh.admin.server.AdminServerRuntimeException;
import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
import com.apache.eventmesh.admin.server.web.handler.RequestHandlerFactory;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.payload.PayloadUtil;
import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;
import org.apache.eventmesh.common.remote.response.EmptyAckResponse;
import org.apache.eventmesh.common.remote.response.FailResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class AdminGrpcServer extends AdminServiceGrpc.AdminServiceImplBase {
@Autowired
RequestHandlerFactory handlerFactory;

private Payload process(Payload value) {
if (value == null || StringUtils.isBlank(value.getMetadata().getType())) {
return PayloadUtil.from(FailResponse.build(ErrorCode.BAD_REQUEST, "bad request: type not " +
"exists"));
}
try {
BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> handler =
handlerFactory.getHandler(value.getMetadata().getType());
if (handler == null) {
return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN,
"not match any request handler"));
}
BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata());
if (response == null || response instanceof EmptyAckResponse) {
return null;
}
return PayloadUtil.from(response);
} catch (Exception e) {
log.warn("process payload {} fail", value.getMetadata().getType(), e);
if (e instanceof AdminServerRuntimeException) {
return PayloadUtil.from(FailResponse.build(((AdminServerRuntimeException)e).getCode(),
e.getMessage()));
}
return PayloadUtil.from(FailResponse.build(ErrorCode.INTERNAL_ERR, "admin server internal err"));
}
}

public StreamObserver<Payload> invokeBiStream(StreamObserver<Payload> responseObserver) {
return new StreamObserver<Payload>() {
@Override
public void onNext(Payload value) {
Payload payload = process(value);
if (payload == null) {
return;
}
responseObserver.onNext(payload);
}

@Override
public void onError(Throwable t) {
if (responseObserver instanceof ServerCallStreamObserver) {
if (!((ServerCallStreamObserver<Payload>) responseObserver).isCancelled()) {
log.warn("admin gRPC server fail", t);
}
}
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

public void invoke(Payload request, StreamObserver<Payload> responseObserver) {
responseObserver.onNext(process(request));
responseObserver.onCompleted();
}
}
Loading

0 comments on commit aa8f604

Please sign in to comment.