From aa8f604221b3ea6633c8bde87d7f8c555bffcfd0 Mon Sep 17 00:00:00 2001 From: sodaRyCN <35725024+sodaRyCN@users.noreply.github.com> Date: Wed, 29 May 2024 10:53:18 +0800 Subject: [PATCH] [ISSUES #4933]Add Admin Module --- .../apache/eventmesh/admin/server/Admin.java | 11 +- .../admin/server/AdminException.java | 11 - .../eventmesh/admin/server/AdminServer.java | 71 ++++- .../admin/server/AdminServerProperties.java | 16 + .../server/AdminServerRuntimeException.java | 12 + .../admin/server/ComponentLifeCycle.java | 2 +- .../EventMeshAdminServerConfiguration.java | 32 -- .../admin/server/ExampleAdminServer.java | 14 + .../eventmesh/admin/server/HeartBeat.java | 14 - .../constatns/AdminServerConstants.java | 9 + .../eventmesh/admin/server/task/Job.java | 8 - .../eventmesh/admin/server/task/JobState.java | 10 - .../eventmesh/admin/server/task/JobType.java | 7 - .../eventmesh/admin/server/task/Position.java | 5 - .../eventmesh/admin/server/task/Task.java | 17 -- .../admin/server/web/AdminGrpcServer.java | 85 ++++++ .../admin/server/web/BaseServer.java | 30 ++ .../admin/server/web/GrpcServer.java | 44 ++- .../admin/server/web/db/DBThreadPool.java | 38 +++ .../web/db/entity/EventMeshDataSource.java | 35 +++ .../web/db/entity/EventMeshJobDetail.java | 29 ++ .../web/db/entity/EventMeshJobInfo.java | 41 +++ .../web/db/entity/EventMeshMysqlPosition.java | 35 +++ .../EventMeshPositionReporterHistory.java | 29 ++ .../db/entity/EventMeshRuntimeHeartbeat.java | 33 +++ .../db/entity/EventMeshRuntimeHistory.java | 27 ++ .../db/mapper/EventMeshDataSourceMapper.java | 20 ++ .../web/db/mapper/EventMeshJobInfoMapper.java | 20 ++ .../mapper/EventMeshMysqlPositionMapper.java | 20 ++ ...ventMeshPositionReporterHistoryMapper.java | 20 ++ .../EventMeshRuntimeHeartbeatMapper.java | 20 ++ .../mapper/EventMeshRuntimeHistoryMapper.java | 20 ++ .../service/EventMeshDataSourceService.java | 13 + .../db/service/EventMeshJobInfoService.java | 12 + .../EventMeshMysqlPositionService.java | 12 + ...entMeshPositionReporterHistoryService.java | 13 + .../EventMeshRuntimeHeartbeatService.java | 12 + .../EventMeshRuntimeHistoryService.java | 13 + .../impl/EventMeshDataSourceServiceImpl.java | 22 ++ .../impl/EventMeshJobInfoServiceImpl.java | 23 ++ .../EventMeshMysqlPositionServiceImpl.java | 23 ++ ...eshPositionReporterHistoryServiceImpl.java | 22 ++ .../EventMeshRuntimeHeartbeatServiceImpl.java | 23 ++ .../EventMeshRuntimeHistoryServiceImpl.java | 22 ++ .../generated/AdminBiStreamServiceGrpc.java | 263 ----------------- .../web/generated/AdminServiceGrpc.java | 279 ------------------ .../web/handler/BaseRequestHandler.java | 13 + .../web/handler/RequestHandlerFactory.java | 47 +++ .../handler/impl/FetchJobRequestHandler.java | 51 ++++ .../handler/impl/FetchPositionHandler.java | 36 +++ .../handler/impl/ReportHeartBeatHandler.java | 50 ++++ .../handler/impl/ReportPositionHandler.java | 81 +++++ .../EventMeshRuntimeHeartbeatBizService.java | 63 ++++ .../job/EventMeshJobInfoBizService.java | 111 +++++++ .../position/EventMeshPositionBizService.java | 61 ++++ .../position/IFetchPositionHandler.java | 9 + .../position/IReportPositionHandler.java | 8 + .../web/service/position/PositionHandler.java | 7 + .../position/PositionHandlerFactory.java | 35 +++ .../position/impl/MysqlPositionHandler.java | 140 +++++++++ ...mesh.admin.server.registry.RegistryService | 16 - .../main/resources/META-INF/spring.factories | 2 + .../src/main/resources/application.yaml | 17 +- .../main/resources/eventmesh-admin.properties | 2 + .../src/main/resources/eventmesh.sql | 114 +++++++ .../mapper/EventMeshDataSourceMapper.xml | 23 ++ .../mapper/EventMeshJobInfoMapper.xml | 27 ++ .../mapper/EventMeshMysqlPositionMapper.xml | 23 ++ ...EventMeshPositionReporterHistoryMapper.xml | 19 ++ .../EventMeshRuntimeHeartbeatMapper.xml | 22 ++ .../mapper/EventMeshRuntimeHistoryMapper.xml | 18 ++ 71 files changed, 1841 insertions(+), 691 deletions(-) delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerProperties.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerRuntimeException.java delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ExampleAdminServer.java delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/constatns/AdminServerConstants.java delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshDataSourceServiceImpl.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminBiStreamServiceGrpc.java delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminServiceGrpc.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportHeartBeatHandler.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IReportPositionHandler.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandler.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java delete mode 100644 eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService create mode 100644 eventmesh-admin-server/src/main/resources/META-INF/spring.factories create mode 100644 eventmesh-admin-server/src/main/resources/eventmesh-admin.properties create mode 100644 eventmesh-admin-server/src/main/resources/eventmesh.sql create mode 100644 eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml create mode 100644 eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml create mode 100644 eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml create mode 100644 eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml create mode 100644 eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml create mode 100644 eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java index 1090f7b592..9be047edc6 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java @@ -1,10 +1,10 @@ package com.apache.eventmesh.admin.server; +import org.apache.eventmesh.common.remote.Task; +import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; import org.apache.eventmesh.common.utils.PagedList; -import com.apache.eventmesh.admin.server.task.Task; - -public interface Admin extends ComponentLifeCycle{ +public interface Admin extends ComponentLifeCycle { /** * support for web or ops **/ @@ -17,8 +17,5 @@ public interface Admin extends ComponentLifeCycle{ /** * support for task */ - void reportHeartbeat(HeartBeat heartBeat); - - - + void reportHeartbeat(ReportHeartBeatRequest heartBeat); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java deleted file mode 100644 index eca5eeb0df..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.apache.eventmesh.admin.server; - -public class AdminException extends RuntimeException { - public AdminException(String message) { - super(message); - } - - public AdminException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java index b4ab41a635..a76b4c194c 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java @@ -1,18 +1,50 @@ package com.apache.eventmesh.admin.server; -import com.apache.eventmesh.admin.server.task.Task; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.config.CommonConfiguration; +import org.apache.eventmesh.common.config.ConfigService; +import org.apache.eventmesh.common.remote.Task; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.PagedList; +import org.apache.eventmesh.registry.RegisterServerInfo; +import org.apache.eventmesh.registry.RegistryFactory; import org.apache.eventmesh.registry.RegistryService; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Service; -public class AdminServer implements Admin { +import javax.annotation.PostConstruct; - private RegistryService registryService; +@Service +@Slf4j +public class AdminServer implements Admin, ApplicationListener { -// private EventMeshAdminServerRegisterInfo registerInfo; + private final RegistryService registryService; - public AdminServer(RegistryService registryService) { - this.registryService = registryService; -// this.registerInfo = registerInfo; + private final RegisterServerInfo adminServeInfo; + + private final CommonConfiguration configuration; + + public AdminServer(AdminServerProperties properties) { + configuration = + ConfigService.getInstance().buildConfigInstance(CommonConfiguration.class); + if (configuration == null) { + throw new AdminServerRuntimeException(ErrorCode.STARTUP_CONFIG_MISS, "common configuration file miss"); + } + this.adminServeInfo = new RegisterServerInfo(); + + adminServeInfo.setHealth(true); + adminServeInfo.setAddress(IPUtils.getLocalAddress() + ":" + properties.getPort()); + String name = Constants.ADMIN_SERVER_REGISTRY_NAME; + if (StringUtils.isNotBlank(properties.getServiceName())) { + name = properties.getServiceName(); + } + adminServeInfo.setServiceName(name); + registryService = RegistryFactory.getInstance(configuration.getEventMeshRegistryPluginType()); } @@ -37,18 +69,35 @@ public PagedList getTaskPaged(Task task) { } @Override - public void reportHeartbeat(HeartBeat heartBeat) { + public void reportHeartbeat(ReportHeartBeatRequest heartBeat) { } @Override + @PostConstruct public void start() { - registryService.register(null); + if (configuration.isEventMeshRegistryPluginEnabled()) { + registryService.init(); + } } @Override public void destroy() { - registryService.unRegister(null); - registryService.shutdown(); + if (configuration.isEventMeshRegistryPluginEnabled()) { + registryService.unRegister(adminServeInfo); + try { + Thread.sleep(3000); + } catch (InterruptedException ignore) { + } + registryService.shutdown(); + } + } + + @Override + public void onApplicationEvent(ApplicationReadyEvent event) { + if (configuration.isEventMeshRegistryPluginEnabled()) { + log.info("application is started and registry plugin is enabled, it's will register admin self"); + registryService.register(adminServeInfo); + } } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerProperties.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerProperties.java new file mode 100644 index 0000000000..ec507d5994 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerProperties.java @@ -0,0 +1,16 @@ +package com.apache.eventmesh.admin.server; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("event-mesh.admin-server") +@Getter +@Setter +public class AdminServerProperties { + private int port; + private boolean enableSSL; + private String configurationPath; + private String configurationFile; + private String serviceName; +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerRuntimeException.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerRuntimeException.java new file mode 100644 index 0000000000..0282bab393 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerRuntimeException.java @@ -0,0 +1,12 @@ +package com.apache.eventmesh.admin.server; + +import lombok.Getter; + +public class AdminServerRuntimeException extends RuntimeException { + @Getter + private final int code; + public AdminServerRuntimeException(int code, String message) { + super(message); + this.code = code; + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java index 76abd005be..cc12f1afdd 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java @@ -1,6 +1,6 @@ package com.apache.eventmesh.admin.server; public interface ComponentLifeCycle { - void start(); + void start() throws Exception; void destroy(); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java deleted file mode 100644 index aab5b7cc78..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.apache.eventmesh.admin.server; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import org.apache.eventmesh.common.config.CommonConfiguration; -import org.apache.eventmesh.common.config.Config; -import org.apache.eventmesh.common.config.ConfigFiled; - -@Data -@NoArgsConstructor -@EqualsAndHashCode(callSuper = true) -@Config(prefix = "eventMesh.admin") -public class EventMeshAdminServerConfiguration extends CommonConfiguration { - @ConfigFiled(field = "server.http.port") - private int eventMeshHttpServerPort = 10000; - - @ConfigFiled(field = "server.gRPC.port") - private int eventMeshGrpcServerPort = 10000; - - @ConfigFiled(field = "registry.plugin.server-addr", notEmpty = true) - private String registryCenterAddr = ""; - - @ConfigFiled(field = "registry.plugin.type", notEmpty = true) - private String eventMeshRegistryPluginType = "nacos"; - - @ConfigFiled(field = "registry.plugin.username") - private String eventMeshRegistryPluginUsername = ""; - - @ConfigFiled(field = "registry.plugin.password") - private String eventMeshRegistryPluginPassword = ""; -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ExampleAdminServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ExampleAdminServer.java new file mode 100644 index 0000000000..51e2e10381 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ExampleAdminServer.java @@ -0,0 +1,14 @@ +package com.apache.eventmesh.admin.server; + +import com.apache.eventmesh.admin.server.constatns.AdminServerConstants; +import org.apache.eventmesh.common.config.ConfigService; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ExampleAdminServer { + public static void main(String[] args) throws Exception { + ConfigService.getInstance().setConfigPath(AdminServerConstants.EVENTMESH_CONF_HOME).setRootConfig(AdminServerConstants.EVENTMESH_CONF_FILE); + SpringApplication.run(ExampleAdminServer.class); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java deleted file mode 100644 index 568b7ff312..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.apache.eventmesh.admin.server; - -import com.apache.eventmesh.admin.server.task.JobState; -import com.apache.eventmesh.admin.server.task.Position; -import lombok.Data; - -@Data -public class HeartBeat { - private String address; - private String reportedTimeStamp; - private String jobID; - private Position position; - private JobState state; -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/constatns/AdminServerConstants.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/constatns/AdminServerConstants.java new file mode 100644 index 0000000000..d3c01fedc9 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/constatns/AdminServerConstants.java @@ -0,0 +1,9 @@ +package com.apache.eventmesh.admin.server.constatns; + +public class AdminServerConstants { + public static final String CONF_ENV = "configurationPath"; + + public static final String EVENTMESH_CONF_HOME = System.getProperty(CONF_ENV, System.getenv(CONF_ENV)); + + public static final String EVENTMESH_CONF_FILE = "eventmesh-admin.properties"; +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java deleted file mode 100644 index 1fe5b08976..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.apache.eventmesh.admin.server.task; - -public class Job { - private long id; - private long taskID; - private JobType type; - private JobState state; -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java deleted file mode 100644 index 845d91c4a3..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.apache.eventmesh.admin.server.task; - -public enum JobState { - INIT, - STARaTED, - PAUSE, - COMPLETE, - DELETE, - FAIL -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java deleted file mode 100644 index b694803988..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.apache.eventmesh.admin.server.task; - -public enum JobType { - FULL, - INCREASE, - STRUCT_SYNC -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java deleted file mode 100644 index 491f796a9b..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.apache.eventmesh.admin.server.task; - -public class Position { - -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java deleted file mode 100644 index 4f6cb7cfe9..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.apache.eventmesh.admin.server.task; - -// task : job = 1 : m -public class Task { - private long id; - private String name; - private String desc; - private String uid; - private String sourceUser; - private String sourcePasswd; - private String targetUser; - private String targetPasswd; - private int sourceType; - private int targetType; - - -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java new file mode 100644 index 0000000000..751ffd85b1 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java @@ -0,0 +1,85 @@ +package com.apache.eventmesh.admin.server.web; + +import com.apache.eventmesh.admin.server.AdminServerRuntimeException; +import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import com.apache.eventmesh.admin.server.web.handler.RequestHandlerFactory; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.payload.PayloadUtil; +import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; +import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; +import org.apache.eventmesh.common.remote.response.EmptyAckResponse; +import org.apache.eventmesh.common.remote.response.FailResponse; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class AdminGrpcServer extends AdminServiceGrpc.AdminServiceImplBase { + @Autowired + RequestHandlerFactory handlerFactory; + + private Payload process(Payload value) { + if (value == null || StringUtils.isBlank(value.getMetadata().getType())) { + return PayloadUtil.from(FailResponse.build(ErrorCode.BAD_REQUEST, "bad request: type not " + + "exists")); + } + try { + BaseRequestHandler handler = + handlerFactory.getHandler(value.getMetadata().getType()); + if (handler == null) { + return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN, + "not match any request handler")); + } + BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata()); + if (response == null || response instanceof EmptyAckResponse) { + return null; + } + return PayloadUtil.from(response); + } catch (Exception e) { + log.warn("process payload {} fail", value.getMetadata().getType(), e); + if (e instanceof AdminServerRuntimeException) { + return PayloadUtil.from(FailResponse.build(((AdminServerRuntimeException)e).getCode(), + e.getMessage())); + } + return PayloadUtil.from(FailResponse.build(ErrorCode.INTERNAL_ERR, "admin server internal err")); + } + } + + public StreamObserver invokeBiStream(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(Payload value) { + Payload payload = process(value); + if (payload == null) { + return; + } + responseObserver.onNext(payload); + } + + @Override + public void onError(Throwable t) { + if (responseObserver instanceof ServerCallStreamObserver) { + if (!((ServerCallStreamObserver) responseObserver).isCancelled()) { + log.warn("admin gRPC server fail", t); + } + } + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } + + public void invoke(Payload request, StreamObserver responseObserver) { + responseObserver.onNext(process(request)); + responseObserver.onCompleted(); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java new file mode 100644 index 0000000000..764be8a9bb --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java @@ -0,0 +1,30 @@ +package com.apache.eventmesh.admin.server.web; + +import com.apache.eventmesh.admin.server.ComponentLifeCycle; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.remote.payload.PayloadFactory; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +@Slf4j +public abstract class BaseServer implements ComponentLifeCycle { + static { + PayloadFactory.getInstance().init(); + } + @PostConstruct + public void init() throws Exception { + log.info("[{}] server starting at port [{}]", this.getClass().getSimpleName(), getPort()); + start(); + log.info("[{}] server started at port [{}]", this.getClass().getSimpleName(), getPort()); + } + + @PreDestroy + public void shutdown() { + log.info("[{}] server will destroy", this.getClass().getSimpleName()); + destroy(); + log.info("[{}] server has be destroy", this.getClass().getSimpleName()); + } + + public abstract int getPort(); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java index fee889a89f..1a1ccb17b7 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java @@ -1,19 +1,53 @@ package com.apache.eventmesh.admin.server.web; -import com.apache.eventmesh.admin.server.ComponentLifeCycle; -import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminBiStreamServiceGrpc; +import com.apache.eventmesh.admin.server.AdminServerProperties; +import io.grpc.Server; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; +import java.util.concurrent.TimeUnit; + @Controller -public class GrpcServer extends AdminBiStreamServiceGrpc.AdminBiStreamServiceImplBase implements ComponentLifeCycle { +@Slf4j +public class GrpcServer extends BaseServer { - @Override - public void start() { + @Autowired + AdminGrpcServer adminGrpcServer; + + @Autowired + AdminServerProperties properties; + + private Server server; + @Override + public void start() throws Exception { + NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(getPort()).addService(adminGrpcServer); + if (properties.isEnableSSL()) { + serverBuilder.sslContext(null); + } + server = serverBuilder.build(); + server.start(); } @Override public void destroy() { + try { + if (server != null) { + server.shutdown(); + if(!server.awaitTermination(30, TimeUnit.SECONDS)) { + log.warn("[{}] server don't graceful stop in 30s, it will shutdown now", this.getClass().getSimpleName()); + server.shutdownNow(); + } + } + } catch (InterruptedException e) { + log.warn("destroy [{}] server fail", this.getClass().getSimpleName(), e); + } + } + @Override + public int getPort() { + return properties.getPort(); } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java new file mode 100644 index 0000000000..e212dd2c5a --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java @@ -0,0 +1,38 @@ +package com.apache.eventmesh.admin.server.web.db; + +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.EventMeshThreadFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Component +@Slf4j +public class DBThreadPool { + private final ThreadPoolExecutor executor = + new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, + Runtime.getRuntime().availableProcessors() * 2, 0L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1000), new EventMeshThreadFactory("admin-server-db"), + new ThreadPoolExecutor.DiscardOldestPolicy()); + @PreDestroy + private void destroy() { + if (!executor.isShutdown()) { + try { + executor.shutdown(); + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + log.info("wait heart beat handler thread pool shutdown timeout, it will shutdown immediately"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + log.warn("wait heart beat handler thread pool shutdown fail"); + } + } + } + + public ThreadPoolExecutor getExecutors() { + return executor; + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java new file mode 100644 index 0000000000..9df7050483 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java @@ -0,0 +1,35 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @TableName event_mesh_data_source + */ +@TableName(value ="event_mesh_data_source") +@Data +public class EventMeshDataSource implements Serializable { + @TableId(type = IdType.AUTO) + private Integer id; + + private Integer dataType; + + private String description; + + private String configuration; + + private Integer createUid; + + private Integer updateUid; + + private Date createTime; + + private Date updateTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java new file mode 100644 index 0000000000..80174a239d --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java @@ -0,0 +1,29 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import lombok.Data; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.job.JobTransportType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import java.util.Map; + +@Data +public class EventMeshJobDetail { + private Integer id; + + private String name; + + private JobTransportType transportType; + + private Map sourceConnectorConfig; + + private String sourceConnectorDesc; + + private Map sinkConnectorConfig; + + private String sinkConnectorDesc; + + private RecordPosition position; + + private JobState state; +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java new file mode 100644 index 0000000000..efb3c95d02 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java @@ -0,0 +1,41 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @TableName event_mesh_job_info + */ +@TableName(value ="event_mesh_job_info") +@Data +public class EventMeshJobInfo implements Serializable { + @TableId(type = IdType.AUTO) + private Integer jobID; + + private String name; + + private Integer transportType; + + private Integer sourceData; + + private Integer targetData; + + private Integer state; + + private Integer jobType; + + private Integer createUid; + + private Integer updateUid; + + private Date createTime; + + private Date updateTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java new file mode 100644 index 0000000000..75a7c245e6 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java @@ -0,0 +1,35 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @TableName event_mesh_mysql_position + */ +@TableName(value ="event_mesh_mysql_position") +@Data +public class EventMeshMysqlPosition implements Serializable { + @TableId(type = IdType.AUTO) + private Integer id; + + private Integer jobID; + + private String address; + + private Long position; + + private Long timestamp; + + private String journalName; + + private Date createTime; + + private Date updateTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java new file mode 100644 index 0000000000..b1c51e7b4e --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java @@ -0,0 +1,29 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @TableName event_mesh_position_reporter_history + */ +@TableName(value ="event_mesh_position_reporter_history") +@Data +public class EventMeshPositionReporterHistory implements Serializable { + @TableId(type = IdType.AUTO) + private Long id; + + private Integer job; + + private String record; + + private String address; + + private Date createTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java new file mode 100644 index 0000000000..298a501739 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java @@ -0,0 +1,33 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @TableName event_mesh_runtime_heartbeat + */ +@TableName(value ="event_mesh_runtime_heartbeat") +@Data +public class EventMeshRuntimeHeartbeat implements Serializable { + @TableId(type = IdType.AUTO) + private Long id; + + private String adminAddr; + + private String runtimeAddr; + + private Integer jobID; + + private String reportTime; + + private Date updateTime; + + private Date createTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java new file mode 100644 index 0000000000..7d3e3f463c --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java @@ -0,0 +1,27 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @TableName event_mesh_runtime_history + */ +@TableName(value ="event_mesh_runtime_history") +@Data +public class EventMeshRuntimeHistory implements Serializable { + @TableId(type = IdType.AUTO) + private Long id; + + private Integer job; + + private String address; + + private Date createTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java new file mode 100644 index 0000000000..805d3642a0 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java @@ -0,0 +1,20 @@ +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_data_source】的数据库操作Mapper +* @createDate 2024-05-09 15:52:49 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource +*/ +@Mapper +public interface EventMeshDataSourceMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java new file mode 100644 index 0000000000..0082a9f2ec --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java @@ -0,0 +1,20 @@ +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_job_info】的数据库操作Mapper +* @createDate 2024-05-09 15:51:45 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo +*/ +@Mapper +public interface EventMeshJobInfoMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java new file mode 100644 index 0000000000..1b29508c1c --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java @@ -0,0 +1,20 @@ +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_mysql_position】的数据库操作Mapper +* @createDate 2024-05-14 17:15:03 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition +*/ +@Mapper +public interface EventMeshMysqlPositionMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java new file mode 100644 index 0000000000..2cdc59f713 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java @@ -0,0 +1,20 @@ +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_position_reporter_history(记录position上报者变更时,老记录)】的数据库操作Mapper +* @createDate 2024-05-14 17:15:03 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory +*/ +@Mapper +public interface EventMeshPositionReporterHistoryMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java new file mode 100644 index 0000000000..a23ef04225 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java @@ -0,0 +1,20 @@ +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Mapper +* @createDate 2024-05-14 17:15:03 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat +*/ +@Mapper +public interface EventMeshRuntimeHeartbeatMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java new file mode 100644 index 0000000000..85fa51aec8 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java @@ -0,0 +1,20 @@ +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_history(记录runtime上运行任务的变更)】的数据库操作Mapper +* @createDate 2024-05-14 17:15:03 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory +*/ +@Mapper +public interface EventMeshRuntimeHistoryMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java new file mode 100644 index 0000000000..6b1e2d8f79 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_data_source】的数据库操作Service +* @createDate 2024-05-09 15:52:49 +*/ +public interface EventMeshDataSourceService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java new file mode 100644 index 0000000000..c9a1e972a4 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java @@ -0,0 +1,12 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_job_info】的数据库操作Service +* @createDate 2024-05-09 15:51:45 +*/ +public interface EventMeshJobInfoService extends IService { +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java new file mode 100644 index 0000000000..83597b0472 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java @@ -0,0 +1,12 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_mysql_position】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshMysqlPositionService extends IService { +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java new file mode 100644 index 0000000000..adaac43959 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_position_reporter_history(记录position上报者变更时,老记录)】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshPositionReporterHistoryService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java new file mode 100644 index 0000000000..3b9b8465c8 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java @@ -0,0 +1,12 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshRuntimeHeartbeatService extends IService { +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java new file mode 100644 index 0000000000..0da277f268 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_history(记录runtime上运行任务的变更)】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshRuntimeHistoryService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshDataSourceServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshDataSourceServiceImpl.java new file mode 100644 index 0000000000..0d9f4f2960 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshDataSourceServiceImpl.java @@ -0,0 +1,22 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshDataSourceMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_data_source】的数据库操作Service实现 +* @createDate 2024-05-09 15:52:49 +*/ +@Service +public class EventMeshDataSourceServiceImpl extends ServiceImpl + implements EventMeshDataSourceService{ + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java new file mode 100644 index 0000000000..eb2a6bf601 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java @@ -0,0 +1,23 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_job_info】的数据库操作Service实现 +* @createDate 2024-05-09 15:51:45 +*/ +@Service +@Slf4j +public class EventMeshJobInfoServiceImpl extends ServiceImpl + implements EventMeshJobInfoService{ +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java new file mode 100644 index 0000000000..a3bfa4770b --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java @@ -0,0 +1,23 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_mysql_position】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +@Slf4j +public class EventMeshMysqlPositionServiceImpl extends ServiceImpl + implements EventMeshMysqlPositionService{ +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java new file mode 100644 index 0000000000..071d44f663 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java @@ -0,0 +1,22 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshPositionReporterHistoryMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_position_reporter_history(记录position上报者变更时,老记录)】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +public class EventMeshPositionReporterHistoryServiceImpl extends ServiceImpl + implements EventMeshPositionReporterHistoryService{ + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java new file mode 100644 index 0000000000..824bd3aec6 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java @@ -0,0 +1,23 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +@Slf4j +public class EventMeshRuntimeHeartbeatServiceImpl extends ServiceImpl + implements EventMeshRuntimeHeartbeatService{ +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java new file mode 100644 index 0000000000..c3883e55b1 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java @@ -0,0 +1,22 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHistoryMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_history(记录runtime上运行任务的变更)】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +public class EventMeshRuntimeHistoryServiceImpl extends ServiceImpl + implements EventMeshRuntimeHistoryService{ + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminBiStreamServiceGrpc.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminBiStreamServiceGrpc.java deleted file mode 100644 index 2a10de9aca..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminBiStreamServiceGrpc.java +++ /dev/null @@ -1,263 +0,0 @@ -package com.apache.eventmesh.admin.server.web.generated; - -import org.apache.eventmesh.common.grpc.EventMeshAdminService; -import org.apache.eventmesh.common.grpc.Payload; - -import static io.grpc.MethodDescriptor.generateFullMethodName; - -/** - */ -@javax.annotation.Generated( - value = "by gRPC proto compiler (version 1.40.0)", - comments = "Source: event_mesh_admin_service.proto") -@io.grpc.stub.annotations.GrpcGenerated -public final class AdminBiStreamServiceGrpc { - - private AdminBiStreamServiceGrpc() {} - - public static final String SERVICE_NAME = "AdminBiStreamService"; - - // Static method descriptors that strictly reflect the proto. - private static volatile io.grpc.MethodDescriptor getInvokeBiStreamMethod; - - @io.grpc.stub.annotations.RpcMethod( - fullMethodName = SERVICE_NAME + '/' + "invokeBiStream", - requestType = Payload.class, - responseType = Payload.class, - methodType = io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING) - public static io.grpc.MethodDescriptor getInvokeBiStreamMethod() { - io.grpc.MethodDescriptor getInvokeBiStreamMethod; - if ((getInvokeBiStreamMethod = AdminBiStreamServiceGrpc.getInvokeBiStreamMethod) == null) { - synchronized (AdminBiStreamServiceGrpc.class) { - if ((getInvokeBiStreamMethod = AdminBiStreamServiceGrpc.getInvokeBiStreamMethod) == null) { - AdminBiStreamServiceGrpc.getInvokeBiStreamMethod = getInvokeBiStreamMethod = - io.grpc.MethodDescriptor.newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "invokeBiStream")) - .setSampledToLocalTracing(true) - .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( - Payload.getDefaultInstance())) - .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( - Payload.getDefaultInstance())) - .setSchemaDescriptor(new AdminBiStreamServiceMethodDescriptorSupplier("invokeBiStream")) - .build(); - } - } - } - return getInvokeBiStreamMethod; - } - - /** - * Creates a new async stub that supports all call types for the service - */ - public static AdminBiStreamServiceStub newStub(io.grpc.Channel channel) { - io.grpc.stub.AbstractStub.StubFactory factory = - new io.grpc.stub.AbstractStub.StubFactory() { - @Override - public AdminBiStreamServiceStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminBiStreamServiceStub(channel, callOptions); - } - }; - return AdminBiStreamServiceStub.newStub(factory, channel); - } - - /** - * Creates a new blocking-style stub that supports unary and streaming output calls on the service - */ - public static AdminBiStreamServiceBlockingStub newBlockingStub( - io.grpc.Channel channel) { - io.grpc.stub.AbstractStub.StubFactory factory = - new io.grpc.stub.AbstractStub.StubFactory() { - @Override - public AdminBiStreamServiceBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminBiStreamServiceBlockingStub(channel, callOptions); - } - }; - return AdminBiStreamServiceBlockingStub.newStub(factory, channel); - } - - /** - * Creates a new ListenableFuture-style stub that supports unary calls on the service - */ - public static AdminBiStreamServiceFutureStub newFutureStub( - io.grpc.Channel channel) { - io.grpc.stub.AbstractStub.StubFactory factory = - new io.grpc.stub.AbstractStub.StubFactory() { - @Override - public AdminBiStreamServiceFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminBiStreamServiceFutureStub(channel, callOptions); - } - }; - return AdminBiStreamServiceFutureStub.newStub(factory, channel); - } - - /** - */ - public static abstract class AdminBiStreamServiceImplBase implements io.grpc.BindableService { - - /** - */ - public io.grpc.stub.StreamObserver invokeBiStream( - io.grpc.stub.StreamObserver responseObserver) { - return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getInvokeBiStreamMethod(), responseObserver); - } - - @Override public final io.grpc.ServerServiceDefinition bindService() { - return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()) - .addMethod( - getInvokeBiStreamMethod(), - io.grpc.stub.ServerCalls.asyncBidiStreamingCall( - new MethodHandlers< - Payload, - Payload>( - this, METHODID_INVOKE_BI_STREAM))) - .build(); - } - } - - /** - */ - public static final class AdminBiStreamServiceStub extends io.grpc.stub.AbstractAsyncStub { - private AdminBiStreamServiceStub( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - super(channel, callOptions); - } - - @Override - protected AdminBiStreamServiceStub build( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminBiStreamServiceStub(channel, callOptions); - } - - /** - */ - public io.grpc.stub.StreamObserver invokeBiStream( - io.grpc.stub.StreamObserver responseObserver) { - return io.grpc.stub.ClientCalls.asyncBidiStreamingCall( - getChannel().newCall(getInvokeBiStreamMethod(), getCallOptions()), responseObserver); - } - } - - /** - */ - public static final class AdminBiStreamServiceBlockingStub extends io.grpc.stub.AbstractBlockingStub { - private AdminBiStreamServiceBlockingStub( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - super(channel, callOptions); - } - - @Override - protected AdminBiStreamServiceBlockingStub build( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminBiStreamServiceBlockingStub(channel, callOptions); - } - } - - /** - */ - public static final class AdminBiStreamServiceFutureStub extends io.grpc.stub.AbstractFutureStub { - private AdminBiStreamServiceFutureStub( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - super(channel, callOptions); - } - - @Override - protected AdminBiStreamServiceFutureStub build( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminBiStreamServiceFutureStub(channel, callOptions); - } - } - - private static final int METHODID_INVOKE_BI_STREAM = 0; - - private static final class MethodHandlers implements - io.grpc.stub.ServerCalls.UnaryMethod, - io.grpc.stub.ServerCalls.ServerStreamingMethod, - io.grpc.stub.ServerCalls.ClientStreamingMethod, - io.grpc.stub.ServerCalls.BidiStreamingMethod { - private final AdminBiStreamServiceImplBase serviceImpl; - private final int methodId; - - MethodHandlers(AdminBiStreamServiceImplBase serviceImpl, int methodId) { - this.serviceImpl = serviceImpl; - this.methodId = methodId; - } - - @Override - @SuppressWarnings("unchecked") - public void invoke(Req request, io.grpc.stub.StreamObserver responseObserver) { - switch (methodId) { - default: - throw new AssertionError(); - } - } - - @Override - @SuppressWarnings("unchecked") - public io.grpc.stub.StreamObserver invoke( - io.grpc.stub.StreamObserver responseObserver) { - switch (methodId) { - case METHODID_INVOKE_BI_STREAM: - return (io.grpc.stub.StreamObserver) serviceImpl.invokeBiStream( - (io.grpc.stub.StreamObserver) responseObserver); - default: - throw new AssertionError(); - } - } - } - - private static abstract class AdminBiStreamServiceBaseDescriptorSupplier - implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier { - AdminBiStreamServiceBaseDescriptorSupplier() {} - - @Override - public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() { - return EventMeshAdminService.getDescriptor(); - } - - @Override - public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() { - return getFileDescriptor().findServiceByName("AdminBiStreamService"); - } - } - - private static final class AdminBiStreamServiceFileDescriptorSupplier - extends AdminBiStreamServiceBaseDescriptorSupplier { - AdminBiStreamServiceFileDescriptorSupplier() {} - } - - private static final class AdminBiStreamServiceMethodDescriptorSupplier - extends AdminBiStreamServiceBaseDescriptorSupplier - implements io.grpc.protobuf.ProtoMethodDescriptorSupplier { - private final String methodName; - - AdminBiStreamServiceMethodDescriptorSupplier(String methodName) { - this.methodName = methodName; - } - - @Override - public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() { - return getServiceDescriptor().findMethodByName(methodName); - } - } - - private static volatile io.grpc.ServiceDescriptor serviceDescriptor; - - public static io.grpc.ServiceDescriptor getServiceDescriptor() { - io.grpc.ServiceDescriptor result = serviceDescriptor; - if (result == null) { - synchronized (AdminBiStreamServiceGrpc.class) { - result = serviceDescriptor; - if (result == null) { - serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME) - .setSchemaDescriptor(new AdminBiStreamServiceFileDescriptorSupplier()) - .addMethod(getInvokeBiStreamMethod()) - .build(); - } - } - } - return result; - } -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminServiceGrpc.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminServiceGrpc.java deleted file mode 100644 index 61b418e900..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminServiceGrpc.java +++ /dev/null @@ -1,279 +0,0 @@ -package com.apache.eventmesh.admin.server.web.generated; - -import org.apache.eventmesh.common.grpc.EventMeshAdminService; -import org.apache.eventmesh.common.grpc.Payload; - -import static io.grpc.MethodDescriptor.generateFullMethodName; - -/** - */ -@javax.annotation.Generated( - value = "by gRPC proto compiler (version 1.40.0)", - comments = "Source: event_mesh_admin_service.proto") -@io.grpc.stub.annotations.GrpcGenerated -public final class AdminServiceGrpc { - - private AdminServiceGrpc() {} - - public static final String SERVICE_NAME = "AdminService"; - - // Static method descriptors that strictly reflect the proto. - private static volatile io.grpc.MethodDescriptor getInvokeMethod; - - @io.grpc.stub.annotations.RpcMethod( - fullMethodName = SERVICE_NAME + '/' + "invoke", - requestType = Payload.class, - responseType = Payload.class, - methodType = io.grpc.MethodDescriptor.MethodType.UNARY) - public static io.grpc.MethodDescriptor getInvokeMethod() { - io.grpc.MethodDescriptor getInvokeMethod; - if ((getInvokeMethod = AdminServiceGrpc.getInvokeMethod) == null) { - synchronized (AdminServiceGrpc.class) { - if ((getInvokeMethod = AdminServiceGrpc.getInvokeMethod) == null) { - AdminServiceGrpc.getInvokeMethod = getInvokeMethod = - io.grpc.MethodDescriptor.newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "invoke")) - .setSampledToLocalTracing(true) - .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( - Payload.getDefaultInstance())) - .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( - Payload.getDefaultInstance())) - .setSchemaDescriptor(new AdminServiceMethodDescriptorSupplier("invoke")) - .build(); - } - } - } - return getInvokeMethod; - } - - /** - * Creates a new async stub that supports all call types for the service - */ - public static AdminServiceStub newStub(io.grpc.Channel channel) { - io.grpc.stub.AbstractStub.StubFactory factory = - new io.grpc.stub.AbstractStub.StubFactory() { - @Override - public AdminServiceStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminServiceStub(channel, callOptions); - } - }; - return AdminServiceStub.newStub(factory, channel); - } - - /** - * Creates a new blocking-style stub that supports unary and streaming output calls on the service - */ - public static AdminServiceBlockingStub newBlockingStub( - io.grpc.Channel channel) { - io.grpc.stub.AbstractStub.StubFactory factory = - new io.grpc.stub.AbstractStub.StubFactory() { - @Override - public AdminServiceBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminServiceBlockingStub(channel, callOptions); - } - }; - return AdminServiceBlockingStub.newStub(factory, channel); - } - - /** - * Creates a new ListenableFuture-style stub that supports unary calls on the service - */ - public static AdminServiceFutureStub newFutureStub( - io.grpc.Channel channel) { - io.grpc.stub.AbstractStub.StubFactory factory = - new io.grpc.stub.AbstractStub.StubFactory() { - @Override - public AdminServiceFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminServiceFutureStub(channel, callOptions); - } - }; - return AdminServiceFutureStub.newStub(factory, channel); - } - - /** - */ - public static abstract class AdminServiceImplBase implements io.grpc.BindableService { - - /** - */ - public void invoke(Payload request, - io.grpc.stub.StreamObserver responseObserver) { - io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getInvokeMethod(), responseObserver); - } - - @Override public final io.grpc.ServerServiceDefinition bindService() { - return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()) - .addMethod( - getInvokeMethod(), - io.grpc.stub.ServerCalls.asyncUnaryCall( - new MethodHandlers< - Payload, - Payload>( - this, METHODID_INVOKE))) - .build(); - } - } - - /** - */ - public static final class AdminServiceStub extends io.grpc.stub.AbstractAsyncStub { - private AdminServiceStub( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - super(channel, callOptions); - } - - @Override - protected AdminServiceStub build( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminServiceStub(channel, callOptions); - } - - /** - */ - public void invoke(Payload request, - io.grpc.stub.StreamObserver responseObserver) { - io.grpc.stub.ClientCalls.asyncUnaryCall( - getChannel().newCall(getInvokeMethod(), getCallOptions()), request, responseObserver); - } - } - - /** - */ - public static final class AdminServiceBlockingStub extends io.grpc.stub.AbstractBlockingStub { - private AdminServiceBlockingStub( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - super(channel, callOptions); - } - - @Override - protected AdminServiceBlockingStub build( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminServiceBlockingStub(channel, callOptions); - } - - /** - */ - public Payload invoke(Payload request) { - return io.grpc.stub.ClientCalls.blockingUnaryCall( - getChannel(), getInvokeMethod(), getCallOptions(), request); - } - } - - /** - */ - public static final class AdminServiceFutureStub extends io.grpc.stub.AbstractFutureStub { - private AdminServiceFutureStub( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - super(channel, callOptions); - } - - @Override - protected AdminServiceFutureStub build( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { - return new AdminServiceFutureStub(channel, callOptions); - } - - /** - */ - public com.google.common.util.concurrent.ListenableFuture invoke( - Payload request) { - return io.grpc.stub.ClientCalls.futureUnaryCall( - getChannel().newCall(getInvokeMethod(), getCallOptions()), request); - } - } - - private static final int METHODID_INVOKE = 0; - - private static final class MethodHandlers implements - io.grpc.stub.ServerCalls.UnaryMethod, - io.grpc.stub.ServerCalls.ServerStreamingMethod, - io.grpc.stub.ServerCalls.ClientStreamingMethod, - io.grpc.stub.ServerCalls.BidiStreamingMethod { - private final AdminServiceImplBase serviceImpl; - private final int methodId; - - MethodHandlers(AdminServiceImplBase serviceImpl, int methodId) { - this.serviceImpl = serviceImpl; - this.methodId = methodId; - } - - @Override - @SuppressWarnings("unchecked") - public void invoke(Req request, io.grpc.stub.StreamObserver responseObserver) { - switch (methodId) { - case METHODID_INVOKE: - serviceImpl.invoke((Payload) request, - (io.grpc.stub.StreamObserver) responseObserver); - break; - default: - throw new AssertionError(); - } - } - - @Override - @SuppressWarnings("unchecked") - public io.grpc.stub.StreamObserver invoke( - io.grpc.stub.StreamObserver responseObserver) { - switch (methodId) { - default: - throw new AssertionError(); - } - } - } - - private static abstract class AdminServiceBaseDescriptorSupplier - implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier { - AdminServiceBaseDescriptorSupplier() {} - - @Override - public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() { - return EventMeshAdminService.getDescriptor(); - } - - @Override - public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() { - return getFileDescriptor().findServiceByName("AdminService"); - } - } - - private static final class AdminServiceFileDescriptorSupplier - extends AdminServiceBaseDescriptorSupplier { - AdminServiceFileDescriptorSupplier() {} - } - - private static final class AdminServiceMethodDescriptorSupplier - extends AdminServiceBaseDescriptorSupplier - implements io.grpc.protobuf.ProtoMethodDescriptorSupplier { - private final String methodName; - - AdminServiceMethodDescriptorSupplier(String methodName) { - this.methodName = methodName; - } - - @Override - public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() { - return getServiceDescriptor().findMethodByName(methodName); - } - } - - private static volatile io.grpc.ServiceDescriptor serviceDescriptor; - - public static io.grpc.ServiceDescriptor getServiceDescriptor() { - io.grpc.ServiceDescriptor result = serviceDescriptor; - if (result == null) { - synchronized (AdminServiceGrpc.class) { - result = serviceDescriptor; - if (result == null) { - serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME) - .setSchemaDescriptor(new AdminServiceFileDescriptorSupplier()) - .addMethod(getInvokeMethod()) - .build(); - } - } - } - return result; - } -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java new file mode 100644 index 0000000000..b1c9518c7f --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.handler; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; +import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; + +public abstract class BaseRequestHandler { + public BaseRemoteResponse handlerRequest(T request, Metadata metadata) { + return handler(request, metadata); + } + + protected abstract S handler(T request, Metadata metadata); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java new file mode 100644 index 0000000000..33535e7f3d --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java @@ -0,0 +1,47 @@ +package com.apache.eventmesh.admin.server.web.handler; + +import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; +import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.stereotype.Component; + +import java.lang.reflect.ParameterizedType; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class RequestHandlerFactory implements ApplicationListener { + + private final Map> handlers = + new ConcurrentHashMap<>(); + + public BaseRequestHandler getHandler(String type) { + return handlers.get(type); + } + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public void onApplicationEvent(ContextRefreshedEvent event) { + Map beans = + event.getApplicationContext().getBeansOfType(BaseRequestHandler.class); + + for (BaseRequestHandler requestHandler : beans.values()) { + Class clazz = requestHandler.getClass(); + boolean skip = false; + while (!clazz.getSuperclass().equals(BaseRequestHandler.class)) { + if (clazz.getSuperclass().equals(Object.class)) { + skip = true; + break; + } + clazz = clazz.getSuperclass(); + } + if (skip) { + continue; + } + + Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0]; + handlers.putIfAbsent(tClass.getSimpleName(), requestHandler); + } + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java new file mode 100644 index 0000000000..9362581eb7 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java @@ -0,0 +1,51 @@ +package com.apache.eventmesh.admin.server.web.handler.impl; + +import com.apache.eventmesh.admin.server.AdminServerRuntimeException; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail; +import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import com.apache.eventmesh.admin.server.web.service.job.EventMeshJobInfoBizService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.FetchJobRequest; +import org.apache.eventmesh.common.remote.response.FetchJobResponse; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class FetchJobRequestHandler extends BaseRequestHandler { + + @Autowired + EventMeshJobInfoBizService jobInfoBizService; + + @Override + public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) { + if (StringUtils.isBlank(request.getJobID())) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "job id is empty"); + } + int jobID; + try { + jobID = Integer.parseInt(request.getJobID()); + } catch (NumberFormatException e) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, String.format("illegal job id %s", + request.getJobID())); + } + FetchJobResponse response = FetchJobResponse.successResponse(); + EventMeshJobDetail detail = jobInfoBizService.getJobDetail(request, metadata); + if (detail == null) { + return response; + } + response.setId(detail.getId()); + response.setName(detail.getName()); + response.setSourceConnectorConfig(detail.getSourceConnectorConfig()); + response.setSourceConnectorDesc(detail.getSourceConnectorDesc()); + response.setTransportType(detail.getTransportType()); + response.setSinkConnectorConfig(detail.getSinkConnectorConfig()); + response.setSourceConnectorDesc(detail.getSinkConnectorDesc()); + response.setState(detail.getState()); + response.setPosition(detail.getPosition()); + return response; + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java new file mode 100644 index 0000000000..bc5bc20508 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java @@ -0,0 +1,36 @@ +package com.apache.eventmesh.admin.server.web.handler.impl; + +import com.apache.eventmesh.admin.server.AdminServerRuntimeException; +import com.apache.eventmesh.admin.server.web.db.DBThreadPool; +import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import com.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.response.FetchPositionResponse; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class FetchPositionHandler extends BaseRequestHandler { + + @Autowired + DBThreadPool executor; + + @Autowired + EventMeshPositionBizService positionBizService; + + @Override + protected FetchPositionResponse handler(FetchPositionRequest request, Metadata metadata) { + if (request.getDataSourceType() == null) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal data type, it's empty"); + } + if (StringUtils.isBlank(request.getJobID())) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); + } + return FetchPositionResponse.successResponse(positionBizService.getPosition(request, metadata)); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportHeartBeatHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportHeartBeatHandler.java new file mode 100644 index 0000000000..5011158f3e --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportHeartBeatHandler.java @@ -0,0 +1,50 @@ +package com.apache.eventmesh.admin.server.web.handler.impl; + +import com.apache.eventmesh.admin.server.web.db.DBThreadPool; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import com.apache.eventmesh.admin.server.web.service.heatbeat.EventMeshRuntimeHeartbeatBizService; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.remote.response.EmptyAckResponse; +import org.apache.eventmesh.common.utils.IPUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class ReportHeartBeatHandler extends BaseRequestHandler { + @Autowired + EventMeshRuntimeHeartbeatBizService heartbeatBizService; + + @Autowired + DBThreadPool executor; + + @Override + protected EmptyAckResponse handler(ReportHeartBeatRequest request, Metadata metadata) { + executor.getExecutors().execute(() -> { + EventMeshRuntimeHeartbeat heartbeat = new EventMeshRuntimeHeartbeat(); + int job; + try { + job = Integer.parseInt(request.getJobID()); + } catch (NumberFormatException e) { + log.warn("runtime {} report heartbeat fail, illegal job id {}", request.getAddress(), request.getJobID()); + return; + } + heartbeat.setJobID(job); + heartbeat.setReportTime(request.getReportedTimeStamp()); + heartbeat.setAdminAddr(IPUtils.getLocalAddress()); + heartbeat.setRuntimeAddr(request.getAddress()); + try { + if (!heartbeatBizService.saveOrUpdateByRuntimeAddress(heartbeat)) { + log.warn("save or update heartbeat request [{}] fail", request); + } + } catch (Exception e) { + log.warn("save or update heartbeat request [{}] fail", request, e); + } + }); + + return new EmptyAckResponse(); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java new file mode 100644 index 0000000000..168b4980bf --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java @@ -0,0 +1,81 @@ +package com.apache.eventmesh.admin.server.web.handler.impl; + +import com.apache.eventmesh.admin.server.AdminServerRuntimeException; +import com.apache.eventmesh.admin.server.web.db.DBThreadPool; +import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import com.apache.eventmesh.admin.server.web.service.job.EventMeshJobInfoBizService; +import com.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.apache.eventmesh.common.remote.response.EmptyAckResponse; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class ReportPositionHandler extends BaseRequestHandler { + @Autowired + EventMeshJobInfoBizService jobInfoBizService; + + @Autowired + DBThreadPool executor; + + @Autowired + EventMeshPositionBizService positionBizService; + + + @Override + protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metadata) { + if (request.getDataSourceType() == null) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal data type, it's empty"); + } + if (StringUtils.isBlank(request.getJobID())) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); + } + if (request.getRecordPositionList() == null || request.getRecordPositionList().isEmpty()) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal record position list, it's empty"); + } + int jobID; + + try { + jobID = Integer.parseInt(request.getJobID()); + } catch (NumberFormatException e) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, String.format("illegal job id [%s] format", + request.getJobID())); + } + + positionBizService.isValidatePositionRequest(request.getDataSourceType()); + + executor.getExecutors().execute(() -> { + try { + boolean reported = positionBizService.reportPosition(request, metadata); + if (reported) { + if (log.isDebugEnabled()) { + log.debug("handle runtime [{}] report data type [{}] job [{}] position [{}] success", + request.getAddress(), request.getDataSourceType(), request.getJobID(), + request.getRecordPositionList()); + } + } else { + log.warn("handle runtime [{}] report data type [{}] job [{}] position [{}] fail", + request.getAddress(), request.getDataSourceType(), request.getJobID(), + request.getRecordPositionList()); + } + } catch (Exception e) { + log.warn("handle position request fail, request [{}]", request, e); + } finally { + try { + if (!jobInfoBizService.updateJobState(jobID, request.getState())) { + log.warn("update job [{}] state to [{}] fail", jobID, request.getState()); + } + } catch (Exception e) { + log.warn("update job id [{}] type [{}] state [{}] fail", request.getJobID(), + request.getDataSourceType(), request.getState(), e); + } + } + }); + return new EmptyAckResponse(); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java new file mode 100644 index 0000000000..f5fd4ae8ca --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java @@ -0,0 +1,63 @@ +package com.apache.eventmesh.admin.server.web.service.heatbeat; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +@Slf4j +public class EventMeshRuntimeHeartbeatBizService { + + @Autowired + EventMeshRuntimeHistoryService historyService; + + @Autowired + EventMeshRuntimeHeartbeatService heartbeatService; + + public boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity) { + EventMeshRuntimeHeartbeat old = heartbeatService.getOne(Wrappers.query().eq( + "runtimeAddr", + entity.getRuntimeAddr())); + if (old == null) { + return heartbeatService.save(entity); + } else { + if (Long.parseLong(old.getReportTime()) >= Long.parseLong(entity.getReportTime())) { + log.info("update heartbeat record ignore, current report time late than db, job " + + "[{}], remote [{}]", entity.getJobID(), entity.getRuntimeAddr()); + return true; + } + try { + return heartbeatService.update(entity, Wrappers.update().eq("updateTime", + old.getUpdateTime())); + } finally { + if (old.getJobID() != null && !old.getJobID().equals(entity.getJobID())) { + EventMeshRuntimeHistory history = new EventMeshRuntimeHistory(); + history.setAddress(old.getAdminAddr()); + history.setJob(old.getJobID()); + try { + historyService.save(history); + } catch (Exception e) { + log.warn("save runtime job changed history fail", e); + } + + log.info("runtime [{}] changed job, old job [{}], now [{}]",entity.getRuntimeAddr(),old.getJobID(), + entity.getJobID()); + } + } + } + } +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java new file mode 100644 index 0000000000..923dc086de --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java @@ -0,0 +1,111 @@ +package com.apache.eventmesh.admin.server.web.service.job; + +import com.apache.eventmesh.admin.server.AdminServerRuntimeException; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import com.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.fasterxml.jackson.core.type.TypeReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.job.JobTransportType; +import org.apache.eventmesh.common.remote.request.FetchJobRequest; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Map; + +/** +* @author sodafang +* @description 针对表【event_mesh_job_info】的数据库操作Service实现 +* @createDate 2024-05-09 15:51:45 +*/ +@Service +@Slf4j +public class EventMeshJobInfoBizService { + + @Autowired + EventMeshJobInfoService jobInfoService; + + @Autowired + EventMeshDataSourceService dataSourceService; + + @Autowired + EventMeshPositionBizService positionBizService; + + public boolean updateJobState(Integer jobID, JobState state) { + if (jobID == null || state == null) { + return false; + } + EventMeshJobInfo jobInfo = new EventMeshJobInfo(); + jobInfo.setJobID(jobID); + jobInfo.setState(state.ordinal()); + jobInfoService.update(jobInfo, Wrappers.update().notIn("state",JobState.DELETE.ordinal(), + JobState.COMPLETE.ordinal())); + return true; + } + + public EventMeshJobDetail getJobDetail(FetchJobRequest request, Metadata metadata) { + if (request == null) { + return null; + } + EventMeshJobInfo job = jobInfoService.getById(request.getJobID()); + if (job == null) { + return null; + } + EventMeshJobDetail detail = new EventMeshJobDetail(); + detail.setId(job.getJobID()); + detail.setName(job.getName()); + EventMeshDataSource source = dataSourceService.getById(job.getSourceData()); + EventMeshDataSource target = dataSourceService.getById(job.getTargetData()); + if (source != null) { + if (!StringUtils.isBlank(source.getConfiguration())) { + try { + detail.setSourceConnectorConfig(JsonUtils.parseTypeReferenceObject(source.getConfiguration(), + new TypeReference>() {})); + } catch (Exception e) { + log.warn("parse source config id [{}] fail", job.getSourceData(), e); + throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA,"illegal source data source config"); + } + } + detail.setSourceConnectorDesc(source.getDescription()); + if (source.getDataType() != null) { + detail.setPosition(positionBizService.getPositionByJobID(job.getJobID(), + DataSourceType.getDataSourceType(source.getDataType()))); + + } + } + if (target != null) { + if (!StringUtils.isBlank(target.getConfiguration())) { + try { + detail.setSinkConnectorConfig(JsonUtils.parseTypeReferenceObject(target.getConfiguration(), + new TypeReference>() {})); + } catch (Exception e) { + log.warn("parse sink config id [{}] fail", job.getSourceData(), e); + throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA,"illegal target data sink config"); + } + } + detail.setSinkConnectorDesc(target.getDescription()); + } + + JobState state = JobState.fromIndex(job.getState()); + if (state == null) { + throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA,"illegal job state in db"); + } + detail.setState(state); + detail.setTransportType(JobTransportType.getJobTransportType(job.getTransportType())); + return detail; + } +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java new file mode 100644 index 0000000000..03014389a8 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java @@ -0,0 +1,61 @@ +package com.apache.eventmesh.admin.server.web.service.position; + +import com.apache.eventmesh.admin.server.AdminServerRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class EventMeshPositionBizService { + @Autowired + PositionHandlerFactory factory; + + // called isValidateReportRequest before call this + public RecordPosition getPosition(FetchPositionRequest request, Metadata metadata) { + if (request == null) { + return null; + } + isValidatePositionRequest(request.getDataSourceType()); + IFetchPositionHandler handler = factory.getHandler(request.getDataSourceType()); + return handler.handler(request, metadata); + } + + public void isValidatePositionRequest(DataSourceType type) { + if (type == null) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "data source type is null"); + } + IReportPositionHandler handler = factory.getHandler(type); + if (handler == null) { + throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, String.format("illegal data base " + + "type [%s], it not match any report position handler", type)); + } + } + + // called isValidateReportRequest before call this + public boolean reportPosition(ReportPositionRequest request, Metadata metadata) { + if (request == null) { + return false; + } + isValidatePositionRequest(request.getDataSourceType()); + IReportPositionHandler handler = factory.getHandler(request.getDataSourceType()); + return handler.handler(request, metadata); + } + + public RecordPosition getPositionByJobID(Integer jobID, DataSourceType type) { + if (jobID == null || type == null) { + return null; + } + isValidatePositionRequest(type); + PositionHandler handler = factory.getHandler(type); + FetchPositionRequest request = new FetchPositionRequest(); + request.setJobID(String.valueOf(jobID)); + return handler.handler(request, null); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java new file mode 100644 index 0000000000..521f07d1b0 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java @@ -0,0 +1,9 @@ +package com.apache.eventmesh.admin.server.web.service.position; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; + +public interface IFetchPositionHandler { + RecordPosition handler(FetchPositionRequest request, Metadata metadata); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IReportPositionHandler.java new file mode 100644 index 0000000000..5c7d1ab41f --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IReportPositionHandler.java @@ -0,0 +1,8 @@ +package com.apache.eventmesh.admin.server.web.service.position; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.request.ReportPositionRequest; + +public interface IReportPositionHandler { + boolean handler(ReportPositionRequest request, Metadata metadata); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandler.java new file mode 100644 index 0000000000..d8ca717978 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandler.java @@ -0,0 +1,7 @@ +package com.apache.eventmesh.admin.server.web.service.position; + +import org.apache.eventmesh.common.remote.job.DataSourceType; + +public abstract class PositionHandler implements IReportPositionHandler,IFetchPositionHandler { + protected abstract DataSourceType getSourceType(); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java new file mode 100644 index 0000000000..6dedb1e3d0 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java @@ -0,0 +1,35 @@ +package com.apache.eventmesh.admin.server.web.service.position; + +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +@Slf4j +public class PositionHandlerFactory implements ApplicationListener { + private final Map handlers = + new ConcurrentHashMap<>(); + public PositionHandler getHandler(DataSourceType type) { + return handlers.get(type); + } + + @Override + public void onApplicationEvent(ContextRefreshedEvent event) { + Map beans = + event.getApplicationContext().getBeansOfType(PositionHandler.class); + + for (PositionHandler handler: beans.values()) { + DataSourceType type = handler.getSourceType(); + if (handlers.containsKey(type)) { + log.warn("data source type [{}] handler already exists", type); + continue; + } + handlers.put(type, handler); + } + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java new file mode 100644 index 0000000000..f72b61f5d9 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java @@ -0,0 +1,140 @@ +package com.apache.eventmesh.admin.server.web.service.position.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; +import com.apache.eventmesh.admin.server.web.service.position.PositionHandler; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +@Slf4j +public class MysqlPositionHandler extends PositionHandler { + @Autowired + EventMeshMysqlPositionService positionService; + + @Autowired + EventMeshPositionReporterHistoryService historyService; + + @Override + protected DataSourceType getSourceType() { + return DataSourceType.MYSQL; + } + + public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { + EventMeshMysqlPosition old = positionService.getOne(Wrappers.query().eq("jobId", + position.getJobID())); + if (old == null) { + return positionService.save(position); + } else { + if (old.getPosition() >= position.getPosition()) { + log.info("job [{}] report position [{}] by runtime [{}] less than db position [{}] by [{}]", + position.getJobID(), position.getPosition(), position.getAddress(), old.getPosition(), old.getAddress()); + return true; + } + try { + return positionService.update(position, Wrappers.update().eq("updateTime", + old.getUpdateTime())); + } finally { + if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) { + EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory(); + history.setRecord(JsonUtils.toJSONString(position)); + history.setJob(old.getJobID()); + history.setAddress(old.getAddress()); + log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); + try { + historyService.save(history); + } catch (Exception e) { + log.warn("save job [{}] mysql position reporter changed history fail, now reporter [{}], old " + + "[{}]", position.getJobID(), position.getAddress(), old.getAddress(), e); + } + } + } + } + } + + @Override + public boolean handler(ReportPositionRequest request, Metadata metadata) { + for (int i = 0; i < 3; i++) { + try { + List recordPositionList = request.getRecordPositionList(); + RecordPosition recordPosition = recordPositionList.get(0); + if (recordPosition == null || recordPosition.getRecordPartition() == null || recordPosition.getRecordOffset() == null) { + log.warn("report mysql position, but record-partition/partition/offset is null"); + return false; + } + if (!(recordPosition.getRecordPartition() instanceof CanalRecordPartition)) { + log.warn("report mysql position, but record partition class [{}] not match [{}]", + recordPosition.getRecordPartition().getRecordPartitionClass(), CanalRecordPartition.class); + return false; + } + if (!(recordPosition.getRecordOffset() instanceof CanalRecordOffset)) { + log.warn("report mysql position, but record offset class [{}] not match [{}]", + recordPosition.getRecordOffset().getRecordOffsetClass(), CanalRecordOffset.class); + return false; + } + CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset(); + CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition(); + EventMeshMysqlPosition position = new EventMeshMysqlPosition(); + position.setJobID(Integer.parseInt(request.getJobID())); + position.setAddress(request.getAddress()); + if (offset != null) { + position.setPosition(offset.getOffset()); + } + if (partition != null) { + position.setTimestamp(partition.getTimeStamp()); + position.setJournalName(partition.getJournalName()); + } + if (!saveOrUpdateByJob(position)) { + log.warn("update job position fail [{}]", request); + return false; + } + return true; + } catch (DuplicateKeyException e) { + log.warn("concurrent report position job [{}], it will try again", request.getJobID()); + } catch (Exception e) { + log.warn("save position job [{}] fail", request.getJobID(), e); + return false; + } + try { + Thread.sleep(200); + } catch (InterruptedException ignore) { + log.warn("save position thread interrupted, [{}]", request); + return true; + } + } + return false; + } + + @Override + public RecordPosition handler(FetchPositionRequest request, Metadata metadata) { + EventMeshMysqlPosition position = positionService.getOne(Wrappers.query().eq("jobID" + , request.getJobID())); + RecordPosition recordPosition = null; + if (position != null) { + CanalRecordPartition partition = new CanalRecordPartition(); + partition.setTimeStamp(position.getTimestamp()); + partition.setJournalName(position.getJournalName()); + CanalRecordOffset offset = new CanalRecordOffset(); + offset.setOffset(position.getPosition()); + recordPosition = new RecordPosition(); + recordPosition.setRecordPartition(partition); + recordPosition.setRecordOffset(offset); + } + return recordPosition; + } +} diff --git a/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService b/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService deleted file mode 100644 index 656fec8f37..0000000000 --- a/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -nacos=com.apache.eventmesh.admin.server.registry.NacosDiscoveryService \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/resources/META-INF/spring.factories b/eventmesh-admin-server/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..ced02f95f4 --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + com.apache.eventmesh.admin.server.AdminServerProperties \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/resources/application.yaml b/eventmesh-admin-server/src/main/resources/application.yaml index aa72432b64..c1f10e1efe 100644 --- a/eventmesh-admin-server/src/main/resources/application.yaml +++ b/eventmesh-admin-server/src/main/resources/application.yaml @@ -1,8 +1,15 @@ spring: datasource: - url: jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false - username: sodafang - password: asdfasdf + url: jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true + username: root + password: mike920830 driver-class-name: com.mysql.cj.jdbc.Driver -mybatis: - mapper-locations: classpath:mapper/*.xml \ No newline at end of file +mybatis-plus: + mapper-locations: classpath:mapper/*.xml + configuration: + map-underscore-to-camel-case: false + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl +event-mesh: + admin-server: + service-name: DEFAULT_GROUP@@em_adm_server + port: 8081 \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/resources/eventmesh-admin.properties b/eventmesh-admin-server/src/main/resources/eventmesh-admin.properties new file mode 100644 index 0000000000..ceb1f59b3b --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/eventmesh-admin.properties @@ -0,0 +1,2 @@ +eventMesh.server.retry.plugin.type=nacos +eventMesh.registry.plugin.server-addr=localhost:8848 diff --git a/eventmesh-admin-server/src/main/resources/eventmesh.sql b/eventmesh-admin-server/src/main/resources/eventmesh.sql new file mode 100644 index 0000000000..37217c40b0 --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/eventmesh.sql @@ -0,0 +1,114 @@ +-- -------------------------------------------------------- +-- 主机: 127.0.0.1 +-- 服务器版本: 8.0.36 - MySQL Community Server - GPL +-- 服务器操作系统: Win64 +-- HeidiSQL 版本: 11.3.0.6295 +-- -------------------------------------------------------- + +/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; +/*!40101 SET NAMES utf8 */; +/*!50503 SET NAMES utf8mb4 */; +/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; +/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; +/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; + + +-- 导出 eventmesh 的数据库结构 +CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */; +USE `eventmesh`; + +-- 导出 表 eventmesh.event_mesh_data_source 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_data_source` ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `dataType` int unsigned NOT NULL, + `description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, + `configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `createUid` int NOT NULL, + `updateUid` int NOT NULL, + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + +-- 数据导出被取消选择。 + +-- 导出 表 eventmesh.event_mesh_job_info 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_job_info` ( + `jobID` int unsigned NOT NULL AUTO_INCREMENT, + `name` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `transportType` int unsigned DEFAULT NULL COMMENT 'JobTransportType', + `sourceData` int unsigned DEFAULT NULL COMMENT 'data_source表', + `targetData` int unsigned DEFAULT NULL, + `state` tinyint unsigned NOT NULL COMMENT 'JobState', + `jobType` tinyint unsigned NOT NULL COMMENT 'connector,mesh,func,...', + `createUid` int unsigned NOT NULL, + `updateUid` int unsigned NOT NULL, + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`jobID`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + +-- 数据导出被取消选择。 + +-- 导出 表 eventmesh.event_mesh_mysql_position 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `jobID` int unsigned NOT NULL, + `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `position` bigint DEFAULT NULL, + `timestamp` bigint DEFAULT NULL, + `journalName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `jobID` (`jobID`) +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; + +-- 数据导出被取消选择。 + +-- 导出 表 eventmesh.event_mesh_position_reporter_history 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `job` int NOT NULL, + `record` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `address` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + KEY `job` (`job`), + KEY `address` (`address`) +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录'; + +-- 数据导出被取消选择。 + +-- 导出 表 eventmesh.event_mesh_runtime_heartbeat 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` ( + `id` bigint unsigned NOT NULL AUTO_INCREMENT, + `adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `jobID` int unsigned DEFAULT NULL, + `reportTime` varchar(50) COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime本地上报时间', + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `runtimeAddr` (`runtimeAddr`), + KEY `jobID` (`jobID`) +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + +-- 数据导出被取消选择。 + +-- 导出 表 eventmesh.event_mesh_runtime_history 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `job` int NOT NULL, + `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + KEY `address` (`address`) +) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更'; + +-- 数据导出被取消选择。 + +/*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */; +/*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */; +/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; +/*!40111 SET SQL_NOTES=IFNULL(@OLD_SQL_NOTES, 1) */; diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml new file mode 100644 index 0000000000..5409806466 --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + + + + id,dataType,address, + description,configuration,createUid, + updateUid,createTime,updateTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml new file mode 100644 index 0000000000..fd8193b778 --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + jobID,name,transportType, + sourceData,targetData,state, + runtimeType,createUid, + updateUid,createTime,updateTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml new file mode 100644 index 0000000000..9851315a3d --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + + + + id,jobID,address, + position,timestamp,journalName, + createTime,updateTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml new file mode 100644 index 0000000000..624b5f9ea5 --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + id,job,record, + address,createTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml new file mode 100644 index 0000000000..3b49c48582 --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + id,adminAddr,runtimeAddr, + jobID,reportTime,updateTime, + createTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml new file mode 100644 index 0000000000..73ab65a67a --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + id,job,address, + createTime + +