diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java index d0d32a5c65..525fe02c0d 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java @@ -152,9 +152,9 @@ public List handler(FetchPositionRequest request, Metadata metad CanalRecordPartition partition = new CanalRecordPartition(); partition.setTimeStamp(position.getTimestamp()); partition.setJournalName(position.getJournalName()); + recordPosition.setRecordPartition(partition); CanalRecordOffset offset = new CanalRecordOffset(); offset.setOffset(position.getPosition()); - recordPosition.setRecordPartition(partition); recordPosition.setRecordOffset(offset); recordPositionList.add(recordPosition); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java index 1a47a05211..acd491ba64 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java @@ -19,10 +19,6 @@ import org.apache.eventmesh.connector.canal.template.MysqlSqlTemplate; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.support.lob.LobHandler; diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java index 56b3a59675..0fade897f6 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java @@ -45,7 +45,7 @@ public class RuntimeInstance { private Map adminServerInfoMap = new HashMap<>(); -// private final RegistryService registryService; + private final RegistryService registryService; private Runtime runtime; @@ -57,20 +57,20 @@ public class RuntimeInstance { public RuntimeInstance(RuntimeInstanceConfig runtimeInstanceConfig) { this.runtimeInstanceConfig = runtimeInstanceConfig; -// this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); + this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); } public void init() throws Exception { -// registryService.init(); -// QueryInstances queryInstances = new QueryInstances(); -// queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); -// queryInstances.setHealth(true); -// List adminServerRegisterInfoList = registryService.selectInstances(queryInstances); -// if (!adminServerRegisterInfoList.isEmpty()) { -// adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); -// } else { -// throw new RuntimeException("admin server address is empty, please check"); -// } + registryService.init(); + QueryInstances queryInstances = new QueryInstances(); + queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); + queryInstances.setHealth(true); + List adminServerRegisterInfoList = registryService.selectInstances(queryInstances); + if (!adminServerRegisterInfoList.isEmpty()) { + adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); + } else { + throw new RuntimeException("admin server address is empty, please check"); + } runtimeInstanceConfig.setAdminServerAddr(adminServerAddr); runtimeFactory = initRuntimeFactory(runtimeInstanceConfig); runtime = runtimeFactory.createRuntime(runtimeInstanceConfig); @@ -80,19 +80,19 @@ public void init() throws Exception { public void start() throws Exception { if (!StringUtils.isBlank(adminServerAddr)) { -// registryService.subscribe((event) -> { -// log.info("runtime receive registry event: {}", event); -// List registerServerInfoList = event.getInstances(); -// Map registerServerInfoMap = new HashMap<>(); -// for (RegisterServerInfo registerServerInfo : registerServerInfoList) { -// registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo); -// } -// if (!registerServerInfoMap.isEmpty()) { -// adminServerInfoMap = registerServerInfoMap; -// updateAdminServerAddr(); -// } -// -// }, runtimeInstanceConfig.getAdminServiceName()); + registryService.subscribe((event) -> { + log.info("runtime receive registry event: {}", event); + List registerServerInfoList = event.getInstances(); + Map registerServerInfoMap = new HashMap<>(); + for (RegisterServerInfo registerServerInfo : registerServerInfoList) { + registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo); + } + if (!registerServerInfoMap.isEmpty()) { + adminServerInfoMap = registerServerInfoMap; + updateAdminServerAddr(); + } + + }, runtimeInstanceConfig.getAdminServiceName()); runtime.start(); isStarted = true; } else {