diff --git a/build.gradle b/build.gradle
index 84fd91892b..909d540360 100644
--- a/build.gradle
+++ b/build.gradle
@@ -708,7 +708,7 @@ subprojects {
dependencyManagement {
dependencies {
- dependency "org.apache.commons:commons-lang3:3.14.0"
+ dependency "org.apache.commons:commons-lang3:3.17.0"
dependency "org.apache.commons:commons-collections4:4.4"
dependency "org.apache.commons:commons-text:1.12.0"
dependency "commons-io:commons-io:2.16.1"
@@ -733,7 +733,7 @@ subprojects {
dependency "org.asynchttpclient:async-http-client:2.12.3"
dependency "org.apache.httpcomponents:httpclient:4.5.14"
- dependency "io.netty:netty-all:4.1.111.Final"
+ dependency "io.netty:netty-all:4.1.112.Final"
dependency "io.dropwizard.metrics:metrics-core:${dropwizardMetricsVersion}"
dependency "io.dropwizard.metrics:metrics-healthchecks:${dropwizardMetricsVersion}"
@@ -782,7 +782,7 @@ subprojects {
dependency "org.javassist:javassist:3.30.2-GA"
- dependency "com.alibaba.nacos:nacos-client:2.3.3"
+ dependency "com.alibaba.nacos:nacos-client:2.4.1"
dependency 'org.apache.zookeeper:zookeeper:3.9.2'
dependency "org.apache.curator:curator-client:${curatorVersion}"
@@ -798,7 +798,7 @@ subprojects {
dependency "com.github.rholder:guava-retrying:2.0.0"
dependency "com.alibaba:druid-spring-boot-starter:1.2.23"
- dependency "com.baomidou:mybatis-plus-boot-starter:3.5.5"
+ dependency "com.baomidou:mybatis-plus-boot-starter:3.5.7"
dependency "com.mysql:mysql-connector-j:8.4.0"
dependency "org.springframework.boot:spring-boot-starter-jetty:2.7.18"
dependency "org.locationtech.jts:jts-core:1.19.0"
diff --git a/eventmesh-admin-server/bin/start-admin.sh b/eventmesh-admin-server/bin/start-admin.sh
index 93c3644397..1633036617 100644
--- a/eventmesh-admin-server/bin/start-admin.sh
+++ b/eventmesh-admin-server/bin/start-admin.sh
@@ -56,34 +56,34 @@ function extract_java_version {
#}
function get_pid {
- local ppid=""
- if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
- ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
- # If the process does not exist, it indicates that the previous process terminated abnormally.
+ local ppid=""
+ if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
+ ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
+ # If the process does not exist, it indicates that the previous process terminated abnormally.
if [ ! -d /proc/$ppid ]; then
# Remove the residual file.
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output."
ppid=""
fi
- else
- if [[ $OS =~ Msys ]]; then
- # There is a Bug on Msys that may not be able to kill the identified process
- ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
- elif [[ $OS =~ Darwin ]]; then
- # Known problem: grep Java may not be able to accurately identify Java processes
- ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
- else
- if [ $DOCKER ]; then
- # No need to exclude root user in Docker containers.
- ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
- else
+ else
+ if [[ $OS =~ Msys ]]; then
+ # There is a Bug on Msys that may not be able to kill the identified process
+ ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
+ elif [[ $OS =~ Darwin ]]; then
+ # Known problem: grep Java may not be able to accurately identify Java processes
+ ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
+ else
+ if [ $DOCKER ]; then
+ # No need to exclude root user in Docker containers.
+ ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
+ else
# It is required to identify the process as accurately as possible on Linux.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'})
fi
- fi
- fi
- echo "$ppid";
+ fi
+ fi
+ echo "$ppid";
}
#===========================================================================================
@@ -136,8 +136,7 @@ export JAVA_HOME
GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"
-#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
-JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
+JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
JAVA_OPT="${JAVA_OPT} -verbose:gc"
if [[ "$JAVA_VERSION" == "8" ]]; then
@@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
# echo "proxy is running already"
# exit 9;
# else
-# echo "err pid$pid, rm pid.file"
+# echo "err pid$pid, rm pid.file"
# rm pid.file
# fi
#fi
@@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then
exit 9
fi
if [ -n "$pid" ]; then
- echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
- exit 9
+ echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
+ exit 9
fi
make_logs_dir
@@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_H
EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
if [ $DOCKER ]; then
- $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
+ $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
else
- $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
+ $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
fi
exit 0
diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle
index bdb6406da2..95c8fa1372 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -20,7 +20,7 @@ dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-registry:eventmesh-registry-api")
implementation project(":eventmesh-registry:eventmesh-registry-nacos")
- implementation project(':eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api')
+ implementation project(":eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api")
implementation "com.alibaba.nacos:nacos-client"
implementation("org.springframework.boot:spring-boot-starter-web") {
exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat"
@@ -38,6 +38,8 @@ dependencies {
implementation "com.alibaba:druid-spring-boot-starter"
compileOnly 'com.mysql:mysql-connector-j'
compileOnly 'org.projectlombok:lombok'
+ testImplementation 'junit:junit:4.12'
+ testImplementation 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
diff --git a/eventmesh-admin-server/conf/application.yaml b/eventmesh-admin-server/conf/application.yaml
index 54795057cb..3d702e579e 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -26,7 +26,17 @@ mybatis-plus:
configuration:
map-underscore-to-camel-case: false
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
+# http server port
+server:
+ port: 8082
event-mesh:
admin-server:
- service-name: DEFAULT_GROUP@@em_adm_server
- port: 8081
\ No newline at end of file
+ serviceName: DEFAULT_GROUP@@em_adm_server
+ # grpc server port
+ port: 8081
+ adminServerList:
+ R1:
+ - http://localhost:8082
+ R2:
+ - http://localhost:8082
+ region: R1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql
index 586ab1c266..6e28daca8a 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -17,147 +17,134 @@
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET NAMES utf8 */;
-/*!50503 SET NAMES utf8mb4 */;
+/*!50503 SET NAMES utf8 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
--- 导出 eventmesh 的数据库结构
-CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
+-- export eventmesh database
+CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */ /*!80016 DEFAULT ENCRYPTION='N' */;
USE `eventmesh`;
--- 导出 表 eventmesh.event_mesh_data_source 结构
+-- export table eventmesh.event_mesh_data_source structure
CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
- `dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
- `configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `region` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
+ `dataType` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
+ `description` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
+ `configuration` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
+ `configurationClass` varchar(200) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '',
+ `region` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `createUid` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
+ `updateUid` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
--- 数据导出被取消选择。
-
--- 导出 表 eventmesh.event_mesh_job_info 结构
+-- export table eventmesh.event_mesh_job_info structure
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
- `jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
- `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `transportType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
+ `jobID` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
+ `jobDesc` varchar(50) COLLATE utf8_bin NOT NULL,
+ `taskID` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
+ `transportType` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
`sourceData` int NOT NULL DEFAULT '0',
`targetData` int NOT NULL DEFAULT '0',
- `state` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `jobType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `fromRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `createUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `updateUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `jobState` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '',
+ `jobType` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '',
+ `fromRegion` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `runningRegion` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `createUid` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `updateUid` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `jobID` (`jobID`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
--- 导出 表 eventmesh.event_mesh_mysql_position 结构
+-- export table eventmesh.event_mesh_mysql_position structure
CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
- `jobID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `serverUUID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
- `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
+ `jobID` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
+ `serverUUID` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
+ `address` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`position` bigint DEFAULT NULL,
- `gtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
- `currentGtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `gtid` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
+ `currentGtid` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
`timestamp` bigint DEFAULT NULL,
- `journalName` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `journalName` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `jobID` (`jobID`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
--- 导出 表 eventmesh.event_mesh_position_reporter_history 结构
+-- export table eventmesh.event_mesh_position_reporter_history structure
CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
`id` bigint NOT NULL AUTO_INCREMENT,
- `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `record` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
+ `job` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
+ `record` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
+ `address` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `job` (`job`),
KEY `address` (`address`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录';
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='record position reporter changes';
--- 数据导出被取消选择。
-
--- 导出 表 eventmesh.event_mesh_runtime_heartbeat 结构
+-- export table eventmesh.event_mesh_runtime_heartbeat structure
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
- `adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `jobID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime本地上报时间',
+ `adminAddr` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
+ `runtimeAddr` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
+ `jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `reportTime` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT 'runtime local report time',
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
- UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
KEY `jobID` (`jobID`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
--- 导出 表 eventmesh.event_mesh_runtime_history 结构
+-- export table eventmesh.event_mesh_runtime_history structure
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
`id` bigint NOT NULL AUTO_INCREMENT,
- `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
+ `job` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
+ `address` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `address` (`address`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更';
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC COMMENT='record runtime task change history';
--- 数据导出被取消选择。
-
--- 导出 表 eventmesh.event_mesh_task_info 结构
+-- export table eventmesh.event_mesh_task_info structure
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
- `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
- `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
- `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'TaskState',
- `fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
- `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
+ `taskID` varchar(50) COLLATE utf8_bin NOT NULL,
+ `taskName` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
+ `taskDesc` varchar(50) COLLATE utf8_bin NOT NULL,
+ `taskState` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT 'taskstate',
+ `sourceRegion` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `targetRegion` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `createUid` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
+ `updateUid` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `taskID` (`taskID`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
--- 导出 表 eventmesh.event_mesh_verify 结构
+-- export table eventmesh.event_mesh_verify structure
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
- `id` int NOT NULL,
- `taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `recordID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `recordSig` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `connectorName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `connectorStage` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `position` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL,
+ `connectorStage` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `position` text COLLATE utf8_bin DEFAULT NULL,
`createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
/*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */;
/*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */;
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
index d100e19033..50e6ad82cc 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
@@ -28,6 +28,7 @@
+
@@ -37,7 +38,7 @@
id,dataType,description,
- configuration,region,createUid,updateUid,
- createTime,updateTime
+ configuration,configurationClass,region,
+ createUid,updateUid,createTime,updateTime
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
index 02e8806680..a053d1c838 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
@@ -19,31 +19,33 @@
-->
+ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+ "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
- id,jobID,desc,
+ id,jobID,jobDesc,
taskID,transportType,sourceData,
- targetData,state,jobType,
- fromRegion,createTime,updateTime
+ targetData,jobState,jobType,
+ fromRegion,runningRegion,createUid,
+ updateUid,createTime,updateTime
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
index 05b1dc52a0..c3514fd945 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
@@ -26,10 +26,11 @@
-
-
-
-
+
+
+
+
+
@@ -37,8 +38,8 @@
- id,taskID,name,
- desc,state,fromRegion,
+ id,taskID,taskName,
+ taskDesc,taskState,sourceRegion,targetRegion,
createUid,updateUid,createTime,
updateTime
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
index b7b042145a..45727498cc 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
@@ -26,6 +26,7 @@
+
@@ -35,8 +36,8 @@
- id,taskID,recordID,
- recordSig,connectorName,connectorStage,
+ id,taskID,jobID,recordID,
+ recordSig,connectorName,connectorStage,
position,createTime
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
index 2162731e21..612d398078 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
@@ -17,6 +17,9 @@
package org.apache.eventmesh.admin.server;
+import java.util.List;
+import java.util.Map;
+
import org.springframework.boot.context.properties.ConfigurationProperties;
import lombok.Getter;
@@ -32,4 +35,6 @@ public class AdminServerProperties {
private String configurationPath;
private String configurationFile;
private String serviceName;
+ private Map> adminServerList;
+ private String region;
}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
index bd896d546c..2454e9f02c 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
@@ -18,24 +18,51 @@
package org.apache.eventmesh.admin.server.web;
import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
+import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
+import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+import lombok.extern.slf4j.Slf4j;
+
@RestController
@RequestMapping("/eventmesh/admin")
+@Slf4j
public class HttpServer {
+
@Autowired
private TaskBizService taskService;
- @RequestMapping("/createTask")
- public ResponseEntity> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
- String uuid = taskService.createTask(task);
- return ResponseEntity.ok(Response.success(uuid));
+ @Autowired
+ private VerifyBizService verifyService;
+
+ @RequestMapping(value = "/createTask", method = RequestMethod.POST)
+ public ResponseEntity
*
*
Implementing classes should ensure thread safety and handle HTTP/HTTPS communication efficiently.
* The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a
- * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord)} method processes HttpConnectRecord on specified URL
- * while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.
+ * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord, Map, ConnectRecord)} method processes HttpConnectRecord
+ * on specified URL while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.
*
- *
It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord)} method
+ *
It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord, Map, ConnectRecord)} method
* to prevent message loss or processing interruptions.
*/
public interface HttpSinkHandler {
@@ -62,9 +63,10 @@ public interface HttpSinkHandler {
*
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
* @return processing chain
*/
- Future> deliver(URI url, HttpConnectRecord httpConnectRecord);
+ Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes, ConnectRecord connectRecord);
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler. This method should be called when the handler is no longer needed.
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
new file mode 100644
index 0000000000..e88707482f
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handler.impl;
+
+import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
+import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common HTTP/HTTPS Sink Handler implementation to handle ConnectRecords by sending them over HTTP or HTTPS to configured URLs.
+ *
+ *
This handler initializes a WebClient for making HTTP requests based on the provided SinkConnectorConfig.
+ * It handles processing ConnectRecords by converting them to HttpConnectRecord and sending them asynchronously to each configured URL using the
+ * WebClient.
+ *
+ *
The handler uses Vert.x's WebClient to perform HTTP/HTTPS requests. It initializes the WebClient in the {@link #start()}
+ * method and closes it in the {@link #stop()} method to manage resources efficiently.
+ *
+ *
Each ConnectRecord is processed and sent to all configured URLs concurrently using asynchronous HTTP requests.
+ */
+@Slf4j
+@Getter
+public class CommonHttpSinkHandler extends AbstractHttpSinkHandler {
+
+ private WebClient webClient;
+
+
+ public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+ super(sinkConnectorConfig);
+ }
+
+ /**
+ * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig.
+ */
+ @Override
+ public void start() {
+ // Create WebClient
+ doInitWebClient();
+ }
+
+ /**
+ * Initializes the WebClient with the provided configuration options.
+ */
+ private void doInitWebClient() {
+ SinkConnectorConfig sinkConnectorConfig = getSinkConnectorConfig();
+ final Vertx vertx = Vertx.vertx();
+ WebClientOptions options = new WebClientOptions()
+ .setKeepAlive(sinkConnectorConfig.isKeepAlive())
+ .setKeepAliveTimeout(sinkConnectorConfig.getKeepAliveTimeout() / 1000)
+ .setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
+ .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
+ .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
+ .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize());
+ this.webClient = WebClient.create(vertx, options);
+ }
+
+ /**
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
+ * URL using the WebClient.
+ *
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
+ * @return processing chain
+ */
+ @Override
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes,
+ ConnectRecord connectRecord) {
+ // create headers
+ Map extensionMap = new HashMap<>();
+ Set extensionKeySet = httpConnectRecord.getExtensions().keySet();
+ for (String extensionKey : extensionKeySet) {
+ Object v = httpConnectRecord.getExtensions().getObject(extensionKey);
+ extensionMap.put(extensionKey, v);
+ }
+
+ MultiMap headers = HttpHeaders.headers()
+ .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
+ .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8")
+ .set("extension", JsonUtils.toJSONString(extensionMap));
+ // get timestamp and offset
+ Long timestamp = httpConnectRecord.getCreateTime()
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+
+ // send the request
+ return this.webClient.post(url.getPath())
+ .host(url.getHost())
+ .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
+ .putHeaders(headers)
+ .ssl(Objects.equals(url.getScheme(), "https"))
+ .sendJson(httpConnectRecord.getData())
+ .onSuccess(res -> {
+ log.info("Request sent successfully. Record: timestamp={}", timestamp);
+
+ Exception e = null;
+
+ // log the response
+ if (HttpUtils.is2xxSuccessful(res.statusCode())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received successful response: statusCode={}. Record: timestamp={}, responseBody={}",
+ res.statusCode(), timestamp, res.bodyAsString());
+ } else {
+ log.info("Received successful response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, responseBody={}",
+ res.statusCode(), timestamp, res.bodyAsString());
+ } else {
+ log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp);
+ }
+
+ e = new RuntimeException("Unexpected HTTP response code: " + res.statusCode());
+ }
+
+ // try callback
+ tryCallback(httpConnectRecord, e, attributes, connectRecord);
+ }).onFailure(err -> {
+ log.error("Request failed to send. Record: timestamp={}", timestamp, err);
+
+ // try callback
+ tryCallback(httpConnectRecord, err, attributes, connectRecord);
+ });
+ }
+
+ /**
+ * Tries to call the callback based on the result of the request.
+ *
+ * @param httpConnectRecord the HttpConnectRecord to use
+ * @param e the exception thrown during the request, may be null
+ * @param attributes additional attributes to be used in processing
+ */
+ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map attributes, ConnectRecord record) {
+ // get the retry event
+ HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, httpConnectRecord, e);
+
+ // get the multi http request context
+ MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, retryEvent);
+
+ if (multiHttpRequestContext.getRemainingRequests() == 0) {
+ // do callback
+ if (record.getCallback() == null) {
+ if (log.isDebugEnabled()) {
+ log.warn("ConnectRecord callback is null. Ignoring callback. {}", record);
+ } else {
+ log.warn("ConnectRecord callback is null. Ignoring callback.");
+ }
+ return;
+ }
+
+ HttpRetryEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent();
+ if (lastFailedEvent == null) {
+ // success
+ record.getCallback().onSuccess(convertToSendResult(record));
+ } else {
+ // failure
+ record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException()));
+ }
+ }
+ }
+
+ /**
+ * Gets and updates the retry event based on the provided attributes and HttpConnectRecord.
+ *
+ * @param attributes the attributes to use
+ * @param httpConnectRecord the HttpConnectRecord to use
+ * @param e the exception thrown during the request, may be null
+ * @return the updated retry event
+ */
+ private HttpRetryEvent getAndUpdateRetryEvent(Map attributes, HttpConnectRecord httpConnectRecord, Throwable e) {
+ // get the retry event
+ HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+ // update the retry event
+ retryEvent.setLastException(e);
+ return retryEvent;
+ }
+
+
+ /**
+ * Gets and updates the multi http request context based on the provided attributes and HttpConnectRecord.
+ *
+ * @param attributes the attributes to use
+ * @param retryEvent the retry event to use
+ * @return the updated multi http request context
+ */
+ private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map attributes, HttpRetryEvent retryEvent) {
+ // get the multi http request context
+ MultiHttpRequestContext multiHttpRequestContext = (MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME);
+
+ if (retryEvent.getLastException() == null || retryEvent.isMaxRetriesReached()) {
+ // decrement the counter
+ multiHttpRequestContext.decrementRemainingRequests();
+
+ // try set failed event
+ if (retryEvent.getLastException() != null) {
+ multiHttpRequestContext.setLastFailedEvent(retryEvent);
+ }
+ }
+
+ return multiHttpRequestContext;
+ }
+
+ private SendResult convertToSendResult(ConnectRecord record) {
+ SendResult result = new SendResult();
+ result.setMessageId(record.getRecordId());
+ if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
+ result.setTopic(record.getExtension("topic"));
+ }
+ return result;
+ }
+
+ private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
+ SendExceptionContext sendExceptionContext = new SendExceptionContext();
+ sendExceptionContext.setMessageId(record.getRecordId());
+ sendExceptionContext.setCause(e);
+ if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
+ sendExceptionContext.setTopic(record.getExtension("topic"));
+ }
+ return sendExceptionContext;
+ }
+
+
+ /**
+ * Cleans up and releases resources used by the HTTP/HTTPS handler.
+ */
+ @Override
+ public void stop() {
+ if (this.webClient != null) {
+ this.webClient.close();
+ } else {
+ log.warn("WebClient is null, ignore.");
+ }
+ }
+}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
new file mode 100644
index 0000000000..820b46296a
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handler.impl;
+
+import org.apache.eventmesh.common.config.connector.http.HttpRetryConfig;
+import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.ConnectException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Map;
+
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+
+
+/**
+ * HttpSinkHandlerRetryWrapper is a wrapper class for the HttpSinkHandler that provides retry functionality for failed HTTP requests.
+ */
+@Slf4j
+public class HttpSinkHandlerRetryWrapper extends AbstractHttpSinkHandler {
+
+ private final HttpRetryConfig httpRetryConfig;
+
+ private final HttpSinkHandler sinkHandler;
+
+ public HttpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, HttpSinkHandler sinkHandler) {
+ super(sinkConnectorConfig);
+ this.sinkHandler = sinkHandler;
+ this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig();
+ }
+
+ /**
+ * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig.
+ */
+ @Override
+ public void start() {
+ sinkHandler.start();
+ }
+
+
+ /**
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the
+ * HttpConnectRecord
+ *
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to pass to the processing chain
+ * @return processing chain
+ */
+ @Override
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes,
+ ConnectRecord connectRecord) {
+
+ // Build the retry policy
+ RetryPolicy> retryPolicy = RetryPolicy.>builder()
+ .handleIf(e -> e instanceof ConnectException)
+ .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
+ .withMaxRetries(httpRetryConfig.getMaxRetries())
+ .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
+ .onRetry(event -> {
+ if (log.isDebugEnabled()) {
+ log.warn("Retrying the request to {} for the {} time. {}", url, event.getAttemptCount(), httpConnectRecord);
+ } else {
+ log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount());
+ }
+ // update the retry event
+ HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+ retryEvent.increaseCurrentRetries();
+ })
+ .onFailure(event -> {
+ if (log.isDebugEnabled()) {
+ log.error("Failed to send the request to {} after {} attempts. {}", url, event.getAttemptCount(),
+ httpConnectRecord, event.getException());
+ } else {
+ log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException());
+ }
+ }).build();
+
+ // Handle the ConnectRecord with retry policy
+ Failsafe.with(retryPolicy)
+ .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, attributes, connectRecord).toCompletionStage());
+
+ return null;
+ }
+
+
+ /**
+ * Cleans up and releases resources used by the HTTP/HTTPS handler.
+ */
+ @Override
+ public void stop() {
+ sinkHandler.stop();
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
similarity index 82%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
index 4e64126a9d..7edd84a967 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.handle;
+package org.apache.eventmesh.connector.http.sink.handler.impl;
+import org.apache.eventmesh.common.config.connector.http.HttpWebhookConfig;
+import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
-import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
-import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.commons.lang3.StringUtils;
@@ -32,6 +33,7 @@
import java.net.URI;
import java.time.LocalDateTime;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -61,8 +63,6 @@
@Slf4j
public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
- private final SinkConnectorConfig sinkConnectorConfig;
-
// the configuration for webhook
private final HttpWebhookConfig webhookConfig;
@@ -86,7 +86,7 @@ public boolean isExportDestroyed() {
public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
- this.sinkConnectorConfig = sinkConnectorConfig;
+
this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
int maxQueueSize = this.webhookConfig.getMaxStorageSize();
this.receivedDataQueue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
@@ -94,9 +94,6 @@ public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
doInitExportServer();
}
- public SynchronizedCircularFifoQueue getReceivedDataQueue() {
- return receivedDataQueue;
- }
/**
* Initialize the server for exporting the received data
@@ -202,22 +199,6 @@ public void start() {
});
}
- /**
- * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
- *
- * @param record the ConnectRecord to process
- */
- @Override
- public void handle(ConnectRecord record) {
- for (URI url : super.getUrls()) {
- // convert ConnectRecord to HttpConnectRecord
- String type = String.format("%s.%s.%s", this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook");
- HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
- // handle the HttpConnectRecord
- deliver(url, httpConnectRecord);
- }
- }
-
/**
* Processes HttpConnectRecord on specified URL while returning its own processing logic This method sends the HttpConnectRecord to the specified
@@ -225,30 +206,28 @@ public void handle(ConnectRecord record) {
*
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
* @return processing chain
*/
@Override
- public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) {
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes,
+ ConnectRecord connectRecord) {
// send the request
- Future> responseFuture = super.deliver(url, httpConnectRecord);
+ Future> responseFuture = super.deliver(url, httpConnectRecord, attributes, connectRecord);
// store the received data
return responseFuture.onComplete(arr -> {
- // If open retry, return directly and handled by RetryHttpSinkHandler
- if (sinkConnectorConfig.getRetryConfig().getMaxRetries() > 0) {
- return;
+ // get tryEvent from attributes
+ HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+
+ HttpResponse response = null;
+ if (arr.succeeded()) {
+ response = arr.result();
+ } else {
+ retryEvent.setLastException(arr.cause());
}
- // create ExportMetadataBuilder
- HttpResponse response = arr.succeeded() ? arr.result() : null;
-
- HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder()
- .url(url.toString())
- .code(response != null ? response.statusCode() : -1)
- .message(response != null ? response.statusMessage() : arr.cause().getMessage())
- .receivedTime(LocalDateTime.now())
- .retriedBy(null)
- .uuid(httpConnectRecord.getUuid())
- .retryNum(0)
- .build();
+
+ // create ExportMetadata
+ HttpExportMetadata httpExportMetadata = buildHttpExportMetadata(url, response, httpConnectRecord, retryEvent);
// create ExportRecord
HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null);
@@ -257,6 +236,37 @@ public Future> deliver(URI url, HttpConnectRecord httpConne
});
}
+ /**
+ * Builds the HttpExportMetadata object based on the response, HttpConnectRecord, and HttpRetryEvent.
+ *
+ * @param url the URI to which the HttpConnectRecord was sent
+ * @param response the response received from the URI
+ * @param httpConnectRecord the HttpConnectRecord that was sent
+ * @param retryEvent the SingleHttpRetryEvent that was used for retries
+ * @return the HttpExportMetadata object
+ */
+ private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse response, HttpConnectRecord httpConnectRecord,
+ HttpRetryEvent retryEvent) {
+
+ String msg = null;
+ // order of precedence: lastException > response > null
+ if (retryEvent.getLastException() != null) {
+ msg = retryEvent.getLimitedExceptionMessage();
+ retryEvent.setLastException(null);
+ } else if (response != null) {
+ msg = response.statusMessage();
+ }
+
+ return HttpExportMetadata.builder()
+ .url(url.toString())
+ .code(response != null ? response.statusCode() : -1)
+ .message(msg)
+ .receivedTime(LocalDateTime.now())
+ .recordId(httpConnectRecord.getHttpRecordId())
+ .retryNum(retryEvent.getCurrentRetries())
+ .build();
+ }
+
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler.
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
index 4155aff910..2b2a01a9dd 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
@@ -34,6 +34,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
+import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
@@ -41,6 +42,7 @@
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.LoggerHandler;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -52,22 +54,18 @@ public class HttpSourceConnector implements Source, ConnectorCreateService configClass() {
@@ -106,7 +104,7 @@ private void doInit() {
final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
- final Route route = router.route()
+ route = router.route()
.path(this.sourceConfig.connectorConfig.getPath())
.handler(LoggerHandler.create());
@@ -136,7 +134,15 @@ public void start() {
@Override
public void commit(ConnectRecord record) {
-
+ if (this.route != null && sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
+ this.route.handler(ctx -> {
+ // Return 200 OK
+ ctx.response()
+ .putHeader("content-type", "application/json")
+ .setStatusCode(HttpResponseStatus.OK.code())
+ .end("{\"status\":\"success\",\"recordId\":\"" + record.getRecordId() + "\"}");
+ });
+ }
}
@Override
@@ -144,6 +150,19 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+ if (this.route != null) {
+ this.route.failureHandler(ctx -> {
+ log.error("Failed to handle the request, recordId {}. ", record.getRecordId(), ctx.failure());
+ // Return Bad Response
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
+ .end("{\"status\":\"failed\",\"recordId\":\"" + record.getRecordId() + "\"}");
+ });
+ }
+ }
+
@Override
public void stop() {
if (this.server != null) {
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
index 2fe7399da2..9e1dcb7b4c 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
@@ -20,6 +20,8 @@
import java.io.Serializable;
import java.util.Map;
+import io.vertx.ext.web.RoutingContext;
+
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -42,4 +44,6 @@ public class WebhookRequest implements Serializable {
private Object payload;
+ private RoutingContext routingContext;
+
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
index 80e4f0a753..0761170ac0 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
@@ -19,21 +19,23 @@
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import java.util.Base64;
import java.util.Map;
import java.util.stream.Collectors;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.http.HttpMethod;
+import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.handler.BodyHandler;
-
import lombok.extern.slf4j.Slf4j;
/**
@@ -45,6 +47,8 @@ public class CommonProtocol implements Protocol {
public static final String PROTOCOL_NAME = "Common";
+ private SourceConnectorConfig sourceConnectorConfig;
+
/**
* Initialize the protocol
*
@@ -52,7 +56,7 @@ public class CommonProtocol implements Protocol {
*/
@Override
public void initialize(SourceConnectorConfig sourceConnectorConfig) {
-
+ this.sourceConnectorConfig = sourceConnectorConfig;
}
/**
@@ -67,20 +71,24 @@ public void setHandler(Route route, SynchronizedCircularFifoQueue queue)
.handler(BodyHandler.create())
.handler(ctx -> {
// Get the payload
- String payloadStr = ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+ Object payload = ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+ payload = JsonUtils.parseObject(payload.toString(), String.class);
// Create and store the webhook request
Map headerMap = ctx.request().headers().entries().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- WebhookRequest webhookRequest = new WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, payloadStr);
+ WebhookRequest webhookRequest = new WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, payload, ctx);
if (!queue.offer(webhookRequest)) {
throw new IllegalStateException("Failed to store the request.");
}
- // Return 200 OK
- ctx.response()
- .setStatusCode(HttpResponseStatus.OK.code())
- .end(CommonResponse.success().toJsonStr());
+ if (!sourceConnectorConfig.isDataConsistencyEnabled()) {
+ // Return 200 OK
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.OK.code())
+ .end(CommonResponse.success().toJsonStr());
+ }
+
})
.failureHandler(ctx -> {
log.error("Failed to handle the request. ", ctx.failure());
@@ -105,7 +113,27 @@ public ConnectRecord convertToConnectRecord(Object message) {
ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis(), request.getPayload());
connectRecord.addExtension("source", request.getProtocolName());
connectRecord.addExtension("url", request.getUrl());
- connectRecord.addExtension("headers", request.getHeaders());
+ request.getHeaders().forEach((k, v) -> {
+ if (k.equalsIgnoreCase("extension")) {
+ JsonObject extension = new JsonObject(v);
+ extension.forEach(e -> connectRecord.addExtension(e.getKey(), e.getValue()));
+ }
+ });
+ // check recordUniqueId
+ if (!connectRecord.getExtensions().containsKey("recordUniqueId")) {
+ connectRecord.addExtension("recordUniqueId", connectRecord.getRecordId());
+ }
+
+ // check data
+ if (connectRecord.getExtensionObj("isBase64") != null) {
+ if (Boolean.parseBoolean(connectRecord.getExtensionObj("isBase64").toString())) {
+ byte[] data = Base64.getDecoder().decode(connectRecord.getData().toString());
+ connectRecord.setData(data);
+ }
+ }
+ if (request.getRoutingContext() != null) {
+ connectRecord.addExtension("routingContext", request.getRoutingContext());
+ }
return connectRecord;
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
index e86efcbf33..fac8c0d801 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
@@ -132,7 +132,7 @@ public void setHandler(Route route, SynchronizedCircularFifoQueue queue)
// Create and store the webhook request
Map headerMap = headers.entries().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- WebhookRequest webhookRequest = new WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, payloadStr);
+ WebhookRequest webhookRequest = new WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, payloadStr, ctx);
if (!queue.offer(webhookRequest)) {
throw new IllegalStateException("Failed to store the request.");
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
new file mode 100644
index 0000000000..d62ff11992
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+HTTP-Source=org.apache.eventmesh.connector.http.source.HttpSourceConnector
+HTTP-Sink=org.apache.eventmesh.connector.http.sink.HttpSinkConnector
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
index 3e724627c0..5f65f0749f 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
@@ -20,8 +20,8 @@
import static org.mockserver.model.HttpRequest.request;
-import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
-import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
+import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig;
+import org.apache.eventmesh.common.config.connector.http.HttpWebhookConfig;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
@@ -86,7 +86,7 @@ void before() throws Exception {
JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString());
return HttpResponse.response()
.withContentType(MediaType.APPLICATION_JSON)
- .withStatusCode(200)
+ .withStatusCode(HttpStatus.SC_OK)
.withBody(new JSONObject()
.fluentPut("code", 0)
.fluentPut("message", "success")
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
index 39681bf179..cc00f1e142 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
@@ -139,6 +139,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
/**
* Stops the Connector.
*
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
index 2b2efcbef2..810a59e723 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
@@ -192,6 +192,11 @@ public String name() {
return "JDBC Source Connector";
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
/**
* Stops the Connector.
*
diff --git a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
index b257cd0f44..0adafc1ce6 100644
--- a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
@@ -94,6 +94,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
producer.close();
diff --git a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
index a3be1cbf93..d573126934 100644
--- a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
@@ -94,6 +94,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
kafkaConsumer.unsubscribe();
diff --git a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
index a12a1c7461..b14f77ecd4 100644
--- a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
@@ -82,6 +82,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
started.compareAndSet(true, false);
diff --git a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
index 537c1ad4d9..1b0c033e8f 100644
--- a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
@@ -65,6 +65,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
started.compareAndSet(true, false);
diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
index d340dffd13..9981322e8f 100644
--- a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
@@ -110,6 +110,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
if (!started.compareAndSet(true, false)) {
diff --git a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
index 776ea8d71f..1001ffa584 100644
--- a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
@@ -87,6 +87,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
this.client.stop();
diff --git a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
index e57c396719..df3f66d6a6 100644
--- a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
@@ -93,6 +93,11 @@ public String name() {
return this.sourceConfig.connectorConfig.getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
this.client.stop();
diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
index 63444efe28..0f00a7e381 100644
--- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
@@ -74,6 +74,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
}
diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
index b66bf9b18c..534ecfb79d 100644
--- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
@@ -76,6 +76,11 @@ public String name() {
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
diff --git a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
index e5f09e4350..e089ef6760 100644
--- a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
@@ -109,6 +109,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
writerMap.forEach((topic, writer) -> writer.close());
diff --git a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
index 2611617d8f..836779dbcf 100644
--- a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
@@ -148,6 +148,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
sourceHandlerMap.forEach((topic, handler) -> {
diff --git a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
index 5c78c718e3..0cafed73f3 100644
--- a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
@@ -145,6 +145,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
log.info("prometheus source connector stop.");
diff --git a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
index 9ff1f22a29..3f90c6c1be 100644
--- a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
@@ -85,6 +85,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
try {
diff --git a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
index 212d3eb487..0bc576221e 100644
--- a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
@@ -87,6 +87,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
try {
diff --git a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
index 4a94a2cb1f..08d1cefbac 100644
--- a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
@@ -95,6 +95,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
if (started) {
diff --git a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
index 655c20d9b9..0b7e726bda 100644
--- a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
@@ -117,6 +117,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
if (started) {
diff --git a/eventmesh-connectors/eventmesh-connector-redis/build.gradle b/eventmesh-connectors/eventmesh-connector-redis/build.gradle
index 0c75e7e108..2525e078db 100644
--- a/eventmesh-connectors/eventmesh-connector-redis/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-redis/build.gradle
@@ -19,7 +19,7 @@ dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
- implementation 'org.redisson:redisson:3.32.0'
+ implementation 'org.redisson:redisson:3.35.0'
api 'io.cloudevents:cloudevents-json-jackson'
diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
index 83c3498a99..5b7d27c3ba 100644
--- a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
@@ -85,6 +85,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
this.redissonClient.shutdown();
diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
index 70adce59e2..868639c205 100644
--- a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
@@ -94,6 +94,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
this.topic.removeAllListeners();
diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
index ae9d4824e5..31d45a28f4 100644
--- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
@@ -78,6 +78,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
producer.shutdown();
diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
index 8ccb84acce..410f927d75 100644
--- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
@@ -206,6 +206,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
consumer.unsubscribe(sourceConfig.getConnectorConfig().getTopic());
diff --git a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
index d0dc30c15e..078ed7691a 100644
--- a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
@@ -121,6 +121,11 @@ public String name() {
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
diff --git a/eventmesh-connectors/eventmesh-connector-slack/build.gradle b/eventmesh-connectors/eventmesh-connector-slack/build.gradle
index d577d3358a..665f748b5f 100644
--- a/eventmesh-connectors/eventmesh-connector-slack/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-slack/build.gradle
@@ -20,7 +20,7 @@ dependencies {
implementation project(":eventmesh-sdks:eventmesh-sdk-java")
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
- implementation "com.slack.api:bolt:1.40.+"
+ implementation "com.slack.api:bolt:1.42.+"
implementation 'com.google.guava:guava'
compileOnly 'org.projectlombok:lombok'
diff --git a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
index e48760d506..836409af71 100644
--- a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
@@ -84,6 +84,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
isRunning = false;
diff --git a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
index 94c40eea50..9ba99cd547 100644
--- a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
@@ -77,6 +77,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
diff --git a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
index a337c1cd81..5f38914bb1 100644
--- a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++ b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
@@ -17,7 +17,7 @@
package org.apache.eventmesh.connector.spring.source;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
/**
* Operations for sending messages.
diff --git a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
index 2ab5a3a3c0..db286eb609 100644
--- a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
@@ -25,10 +25,10 @@
import org.apache.eventmesh.common.remote.offset.spring.SpringRecordPartition;
import org.apache.eventmesh.connector.spring.source.MessageSendingOperations;
import org.apache.eventmesh.openconnect.SourceWorker;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.util.ArrayList;
@@ -95,6 +95,11 @@ public String name() {
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
@@ -123,6 +128,7 @@ public List poll() {
/**
* Send message.
+ *
* @param message message to send
*/
@Override
@@ -136,9 +142,9 @@ public void send(Object message) {
/**
* Send message with a callback.
- * @param message message to send.
- * @param workerCallback After the user sends the message to the Connector,
- * the SourceWorker will fetch message and invoke.
+ *
+ * @param message message to send.
+ * @param workerCallback After the user sends the message to the Connector, the SourceWorker will fetch message and invoke.
*/
@Override
public void send(Object message, SendMessageCallback workerCallback) {
diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
index dec3f5e5de..6908d119b9 100644
--- a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
@@ -115,6 +115,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws IOException {
isRunning = false;
diff --git a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
index ef6aed58c5..ca628fa590 100644
--- a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
@@ -95,6 +95,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws IOException {
isRunning = false;
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
index b7ea8890ee..a734bb6efa 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
@@ -19,9 +19,9 @@
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.connector.spring.source.connector.SpringSourceConnector;
-import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
-import org.apache.eventmesh.openconnect.api.callback.SendResult;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import java.util.HashMap;
import java.util.Map;
@@ -53,8 +53,8 @@ public void onSuccess(SendResult sendResult) {
}
@Override
- public void onException(SendExcepionContext sendExcepionContext) {
- log.info("Spring source worker send message to EventMesh failed!", sendExcepionContext.getCause());
+ public void onException(SendExceptionContext sendExceptionContext) {
+ log.info("Spring source worker send message to EventMesh failed!", sendExceptionContext.getCause());
}
});
return "success!";
diff --git a/eventmesh-meta/eventmesh-meta-raft/build.gradle b/eventmesh-meta/eventmesh-meta-raft/build.gradle
index 6144bcdd56..5b2324ce57 100644
--- a/eventmesh-meta/eventmesh-meta-raft/build.gradle
+++ b/eventmesh-meta/eventmesh-meta-raft/build.gradle
@@ -20,7 +20,7 @@ plugins {
}
def grpcVersion = '1.65.1'
-def protobufVersion = '3.25.3'
+def protobufVersion = '3.25.4'
def protocVersion = protobufVersion
def jraftVersion = '1.3.14'
diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index 6e48aa1de8..2a2162a7af 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -32,11 +32,11 @@
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.SystemUtils;
-import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
-import org.apache.eventmesh.openconnect.api.callback.SendResult;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl;
@@ -264,8 +264,8 @@ private SendResult convertToSendResult(CloudEvent event) {
return result;
}
- private SendExcepionContext convertToExceptionContext(CloudEvent event, Throwable cause) {
- SendExcepionContext exceptionContext = new SendExcepionContext();
+ private SendExceptionContext convertToExceptionContext(CloudEvent event, Throwable cause) {
+ SendExceptionContext exceptionContext = new SendExceptionContext();
exceptionContext.setTopic(event.getId());
exceptionContext.setMessageId(event.getId());
exceptionContext.setCause(cause);
diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
index 8ac09eac38..07e44aea94 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
@@ -34,8 +34,7 @@ public interface Connector extends ComponentLifeCycle {
Class extends Config> configClass();
/**
- * This init method is obsolete. For detailed discussion,
- * please see here
+ * This init method is obsolete. For detailed discussion, please see here
*
* Initializes the Connector with the provided configuration.
*
@@ -67,4 +66,12 @@ public interface Connector extends ComponentLifeCycle {
*/
String name();
+ /**
+ * This method will be called when an exception occurs while processing a ConnectRecord object. This method can be used to handle the exception,
+ * such as logging error information, or stopping the connector's operation when an exception occurs.
+ *
+ * @param record The ConnectRecord object that was being processed when the exception occurred
+ */
+ void onException(ConnectRecord record);
+
}
diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java
index cf1b853474..1ef048b06c 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java
@@ -18,6 +18,9 @@
package org.apache.eventmesh.openconnect.api.connector;
import org.apache.eventmesh.common.config.connector.SinkConfig;
+import org.apache.eventmesh.common.remote.job.JobType;
+
+import java.util.Map;
import lombok.Data;
@@ -29,4 +32,8 @@ public class SinkConnectorContext implements ConnectorContext {
public SinkConfig sinkConfig;
+ public Map runtimeConfig;
+
+ public JobType jobType;
+
}
diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
index 55c88ce55a..957452bb10 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
@@ -18,10 +18,12 @@
package org.apache.eventmesh.openconnect.api.connector;
import org.apache.eventmesh.common.config.connector.SourceConfig;
+import org.apache.eventmesh.common.remote.job.JobType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;
import java.util.List;
+import java.util.Map;
import lombok.Data;
@@ -35,6 +37,10 @@ public class SourceConnectorContext implements ConnectorContext {
public SourceConfig sourceConfig;
+ public Map runtimeConfig;
+
+ public JobType jobType;
+
// initial record position
public List recordPositionList;
diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index 08270fc024..993352a979 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Random;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -111,6 +112,8 @@ public void persist() {
reportPositionRequest.setRecordPositionList(recordToSyncList);
+ log.debug("start report position request: {}", JsonUtils.toJSONString(reportPositionRequest));
+
Metadata metadata = Metadata.newBuilder()
.setType(ReportPositionRequest.class.getSimpleName())
.build();
@@ -120,6 +123,7 @@ public void persist() {
.build())
.build();
requestObserver.onNext(payload);
+ log.debug("end report position request: {}", JsonUtils.toJSONString(reportPositionRequest));
for (Map.Entry entry : recordMap.entrySet()) {
positionStore.remove(entry.getKey());
@@ -236,7 +240,7 @@ public void initialize(OffsetStorageConfig offsetStorageConfig) {
this.dataSourceType = offsetStorageConfig.getDataSourceType();
this.dataSinkType = offsetStorageConfig.getDataSinkType();
- this.adminServerAddr = offsetStorageConfig.getOffsetStorageAddr();
+ this.adminServerAddr = getRandomAdminServerAddr(offsetStorageConfig.getOffsetStorageAddr());
this.channel = ManagedChannelBuilder.forTarget(adminServerAddr)
.usePlaintext()
.build();
@@ -274,4 +278,14 @@ public void onCompleted() {
this.jobState = TaskState.RUNNING;
this.jobId = offsetStorageConfig.getExtensions().get("jobId");
}
+
+ private String getRandomAdminServerAddr(String adminServerAddrList) {
+ String[] addresses = adminServerAddrList.split(";");
+ if (addresses.length == 0) {
+ throw new IllegalArgumentException("Admin server address list is empty");
+ }
+ Random random = new Random();
+ int randomIndex = random.nextInt(addresses.length);
+ return addresses[randomIndex];
+ }
}
diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java
similarity index 90%
rename from eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
rename to eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java
index 0311ceaef5..974b19a547 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.eventmesh.openconnect.api.callback;
+package org.apache.eventmesh.openconnect.offsetmgmt.api.callback;
-public class SendExcepionContext {
+public class SendExceptionContext {
private String messageId;
private String topic;
private Throwable cause;
- public SendExcepionContext() {
+ public SendExceptionContext() {
}
public String getMessageId() {
diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java
similarity index 87%
rename from eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
rename to eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java
index fd6baba7ec..8346cf36b4 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.openconnect.api.callback;
+package org.apache.eventmesh.openconnect.offsetmgmt.api.callback;
/**
* Message sending callback interface.
@@ -24,5 +24,5 @@ public interface SendMessageCallback {
void onSuccess(SendResult sendResult);
- void onException(SendExcepionContext sendExcepionContext);
+ void onException(SendExceptionContext sendExceptionContext);
}
diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java
similarity index 95%
rename from eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
rename to eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java
index 8cd861f6de..9afc745f3d 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.openconnect.api.callback;
+package org.apache.eventmesh.openconnect.offsetmgmt.api.callback;
public class SendResult {
diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index cda57e3758..0a41e18f7c 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -20,23 +20,38 @@
import org.apache.eventmesh.common.remote.offset.RecordOffset;
import org.apache.eventmesh.common.remote.offset.RecordPartition;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
+
+import lombok.Getter;
+import lombok.Setter;
/**
* SourceDataEntries are generated by SourceTasks and passed to specific message queue to store.
*/
+@Getter
public class ConnectRecord {
+ private final String recordId = UUID.randomUUID().toString();
+
+ @Setter
private Long timestamp;
+ @Setter
private Object data;
+ @Setter
private RecordPosition position;
+ @Setter
private KeyValue extensions;
+ @Setter
+ private SendMessageCallback callback;
+
public ConnectRecord() {
}
@@ -57,38 +72,6 @@ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
this.data = data;
}
- public Long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
-
- public Object getData() {
- return data;
- }
-
- public void setData(Object data) {
- this.data = data;
- }
-
- public KeyValue getExtensions() {
- return extensions;
- }
-
- public void setExtensions(KeyValue extensions) {
- this.extensions = extensions;
- }
-
- public RecordPosition getPosition() {
- return position;
- }
-
- public void setPosition(RecordPosition position) {
- this.position = position;
- }
-
public void addExtension(KeyValue extensions) {
if (this.extensions == null) {
this.extensions = new DefaultKeyValue();
@@ -136,19 +119,20 @@ public boolean equals(Object o) {
return false;
}
ConnectRecord that = (ConnectRecord) o;
- return Objects.equals(timestamp, that.timestamp) && Objects.equals(data, that.data)
+ return Objects.equals(recordId, that.recordId) && Objects.equals(timestamp, that.timestamp) && Objects.equals(data, that.data)
&& Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions);
}
@Override
public int hashCode() {
- return Objects.hash(timestamp, data, position, extensions);
+ return Objects.hash(recordId, timestamp, data, position, extensions);
}
@Override
public String toString() {
return "ConnectRecord{"
- + "timestamp=" + timestamp
+ + "recordId=" + recordId
+ + ", timestamp=" + timestamp
+ ", data=" + data
+ ", position=" + position
+ ", extensions=" + extensions
diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
index a0390c1892..891df482be 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
@@ -23,6 +23,11 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
public class DefaultKeyValue implements KeyValue {
private final Map properties;
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
index 6b196a1f8e..c3904f4822 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
@@ -23,7 +23,7 @@ dependencies {
implementation ("io.grpc:grpc-protobuf:1.65.1") {
exclude group: "com.google.protobuf", module: "protobuf-java"
}
- implementation("com.google.protobuf:protobuf-java:3.25.3")
+ implementation("com.google.protobuf:protobuf-java:3.25.4")
implementation "io.cloudevents:cloudevents-protobuf"
compileOnly 'org.projectlombok:lombok'
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-grpc/build.gradle b/eventmesh-protocol-plugin/eventmesh-protocol-grpc/build.gradle
index c28e10f728..0149929479 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-grpc/build.gradle
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-grpc/build.gradle
@@ -25,7 +25,7 @@ repositories {
}
def grpcVersion = '1.65.1'
-def protobufVersion = '3.25.3'
+def protobufVersion = '3.25.4'
def protocVersion = protobufVersion
dependencies {
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-http/build.gradle b/eventmesh-protocol-plugin/eventmesh-protocol-http/build.gradle
index 2544359735..67a9ef6183 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-http/build.gradle
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-http/build.gradle
@@ -23,5 +23,5 @@ dependencies {
implementation ("io.grpc:grpc-protobuf:1.65.1") {
exclude group: "com.google.protobuf", module: "protobuf-java"
}
- implementation("com.google.protobuf:protobuf-java:3.25.3")
+ implementation("com.google.protobuf:protobuf-java:3.25.4")
}
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/build.gradle b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/build.gradle
index edd8632919..d67f5fd9e3 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/build.gradle
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/build.gradle
@@ -21,7 +21,7 @@ dependencies {
implementation ("io.grpc:grpc-protobuf:1.65.1") {
exclude group: "com.google.protobuf", module: "protobuf-java"
}
- implementation("com.google.protobuf:protobuf-java:3.25.3")
+ implementation("com.google.protobuf:protobuf-java:3.25.4")
implementation "io.cloudevents:cloudevents-protobuf"
testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
diff --git a/eventmesh-runtime-v2/build.gradle b/eventmesh-runtime-v2/build.gradle
index ecba7bffb4..04b460ade3 100644
--- a/eventmesh-runtime-v2/build.gradle
+++ b/eventmesh-runtime-v2/build.gradle
@@ -35,6 +35,7 @@ dependencies {
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-common")
implementation project(":eventmesh-connectors:eventmesh-connector-canal")
+ implementation project(":eventmesh-connectors:eventmesh-connector-http")
implementation project(":eventmesh-meta:eventmesh-meta-api")
implementation project(":eventmesh-meta:eventmesh-meta-nacos")
implementation project(":eventmesh-registry:eventmesh-registry-api")
diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java
index 7171b3fc27..caa5330fe3 100644
--- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java
+++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java
@@ -28,6 +28,8 @@
@Config(path = "classPath://runtime.yaml")
public class RuntimeInstanceConfig {
+ private boolean registryEnabled;
+
private String registryServerAddr;
private String registryPluginType;
@@ -36,7 +38,7 @@ public class RuntimeInstanceConfig {
private String adminServiceName;
- private String adminServerAddr;
+ private String adminServiceAddr;
private ComponentType componentType;
diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
index 0fade897f6..beb1d1eedc 100644
--- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
+++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
@@ -41,11 +41,11 @@
@Slf4j
public class RuntimeInstance {
- private String adminServerAddr = "127.0.0.1:8081";
+ private String adminServiceAddr;
private Map adminServerInfoMap = new HashMap<>();
- private final RegistryService registryService;
+ private RegistryService registryService;
private Runtime runtime;
@@ -57,46 +57,54 @@ public class RuntimeInstance {
public RuntimeInstance(RuntimeInstanceConfig runtimeInstanceConfig) {
this.runtimeInstanceConfig = runtimeInstanceConfig;
- this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType());
+ if (runtimeInstanceConfig.isRegistryEnabled()) {
+ this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType());
+ }
}
public void init() throws Exception {
- registryService.init();
- QueryInstances queryInstances = new QueryInstances();
- queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName());
- queryInstances.setHealth(true);
- List adminServerRegisterInfoList = registryService.selectInstances(queryInstances);
- if (!adminServerRegisterInfoList.isEmpty()) {
- adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList);
+ if (registryService != null) {
+ registryService.init();
+ QueryInstances queryInstances = new QueryInstances();
+ queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName());
+ queryInstances.setHealth(true);
+ List adminServerRegisterInfoList = registryService.selectInstances(queryInstances);
+ if (!adminServerRegisterInfoList.isEmpty()) {
+ adminServiceAddr = getRandomAdminServerAddr(adminServerRegisterInfoList);
+ } else {
+ throw new RuntimeException("admin server address is empty, please check");
+ }
+ // use registry adminServiceAddr value replace config
+ runtimeInstanceConfig.setAdminServiceAddr(adminServiceAddr);
} else {
- throw new RuntimeException("admin server address is empty, please check");
+ adminServiceAddr = runtimeInstanceConfig.getAdminServiceAddr();
}
- runtimeInstanceConfig.setAdminServerAddr(adminServerAddr);
+
runtimeFactory = initRuntimeFactory(runtimeInstanceConfig);
runtime = runtimeFactory.createRuntime(runtimeInstanceConfig);
runtime.init();
}
public void start() throws Exception {
- if (!StringUtils.isBlank(adminServerAddr)) {
-
- registryService.subscribe((event) -> {
- log.info("runtime receive registry event: {}", event);
- List registerServerInfoList = event.getInstances();
- Map registerServerInfoMap = new HashMap<>();
- for (RegisterServerInfo registerServerInfo : registerServerInfoList) {
- registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo);
- }
- if (!registerServerInfoMap.isEmpty()) {
- adminServerInfoMap = registerServerInfoMap;
- updateAdminServerAddr();
- }
-
- }, runtimeInstanceConfig.getAdminServiceName());
+ if (StringUtils.isBlank(adminServiceAddr)) {
+ throw new RuntimeException("admin server address is empty, please check");
+ } else {
+ if (registryService != null) {
+ registryService.subscribe((event) -> {
+ log.info("runtime receive registry event: {}", event);
+ List registerServerInfoList = event.getInstances();
+ Map registerServerInfoMap = new HashMap<>();
+ for (RegisterServerInfo registerServerInfo : registerServerInfoList) {
+ registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo);
+ }
+ if (!registerServerInfoMap.isEmpty()) {
+ adminServerInfoMap = registerServerInfoMap;
+ updateAdminServerAddr();
+ }
+ }, runtimeInstanceConfig.getAdminServiceName());
+ }
runtime.start();
isStarted = true;
- } else {
- throw new RuntimeException("admin server address is empty, please check");
}
}
@@ -106,14 +114,14 @@ public void shutdown() throws Exception {
private void updateAdminServerAddr() throws Exception {
if (isStarted) {
- if (!adminServerInfoMap.containsKey(adminServerAddr)) {
- adminServerAddr = getRandomAdminServerAddr(adminServerInfoMap);
- log.info("admin server address changed to: {}", adminServerAddr);
+ if (!adminServerInfoMap.containsKey(adminServiceAddr)) {
+ adminServiceAddr = getRandomAdminServerAddr(adminServerInfoMap);
+ log.info("admin server address changed to: {}", adminServiceAddr);
shutdown();
start();
}
} else {
- adminServerAddr = getRandomAdminServerAddr(adminServerInfoMap);
+ adminServiceAddr = getRandomAdminServerAddr(adminServerInfoMap);
}
}
diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 0335a09568..3d3c864b58 100644
--- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -31,19 +31,23 @@
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.JobState;
import org.apache.eventmesh.common.remote.request.FetchJobRequest;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
+import org.apache.eventmesh.common.remote.request.ReportJobRequest;
import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
import org.apache.eventmesh.common.remote.response.FetchJobResponse;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.factory.ConnectorPluginFactory;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl;
@@ -56,14 +60,17 @@
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.UUID;
+import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -124,12 +131,18 @@ public class ConnectorRuntime implements Runtime {
private final ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor();
+ private final ExecutorService reportVerifyExecutor = Executors.newSingleThreadExecutor();
+
private final BlockingQueue queue;
private volatile boolean isRunning = false;
+ private volatile boolean isFailed = false;
+
public static final String CALLBACK_EXTENSION = "callBackExtension";
+ private String adminServerAddr;
+
public ConnectorRuntime(RuntimeInstanceConfig runtimeInstanceConfig) {
this.runtimeInstanceConfig = runtimeInstanceConfig;
@@ -147,8 +160,9 @@ public void init() throws Exception {
}
private void initAdminService() {
+ adminServerAddr = getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr());
// create gRPC channel
- channel = ManagedChannelBuilder.forTarget(runtimeInstanceConfig.getAdminServerAddr()).usePlaintext().build();
+ channel = ManagedChannelBuilder.forTarget(adminServerAddr).usePlaintext().build();
adminServiceStub = AdminServiceGrpc.newStub(channel).withWaitForReady();
@@ -174,6 +188,16 @@ public void onCompleted() {
requestObserver = adminServiceStub.invokeBiStream(responseObserver);
}
+ private String getRandomAdminServerAddr(String adminServerAddrList) {
+ String[] addresses = adminServerAddrList.split(";");
+ if (addresses.length == 0) {
+ throw new IllegalArgumentException("Admin server address list is empty");
+ }
+ Random random = new Random();
+ int randomIndex = random.nextInt(addresses.length);
+ return addresses[randomIndex];
+ }
+
private void initStorageService() {
// TODO: init producer & consumer
producer = StoragePluginFactory.getMeshMQProducer(runtimeInstanceConfig.getStoragePluginType());
@@ -189,6 +213,8 @@ private void initConnectorService() throws Exception {
FetchJobResponse jobResponse = fetchJobConfig();
if (jobResponse == null) {
+ isFailed = true;
+ stop();
throw new RuntimeException("fetch job config fail");
}
@@ -200,24 +226,18 @@ private void initConnectorService() throws Exception {
connectorRuntimeConfig.setSinkConnectorDesc(jobResponse.getConnectorConfig().getSinkConnectorDesc());
connectorRuntimeConfig.setSinkConnectorConfig(jobResponse.getConnectorConfig().getSinkConnectorConfig());
- ConnectorCreateService> sourceConnectorCreateService =
- ConnectorPluginFactory.createConnector(connectorRuntimeConfig.getSourceConnectorType() + "-Source");
- sourceConnector = (Source) sourceConnectorCreateService.create();
-
- SourceConfig sourceConfig = (SourceConfig) ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), sourceConnector.configClass());
- SourceConnectorContext sourceConnectorContext = new SourceConnectorContext();
- sourceConnectorContext.setSourceConfig(sourceConfig);
- sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
- if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
- sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
- }
-
// spi load offsetMgmtService
this.offsetManagement = new RecordOffsetManagement();
this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
- OffsetStorageConfig offsetStorageConfig = sourceConfig.getOffsetStorageConfig();
+ OffsetStorageConfig offsetStorageConfig = new OffsetStorageConfig();
+ offsetStorageConfig.setOffsetStorageAddr(connectorRuntimeConfig.getRuntimeConfig().get("offsetStorageAddr").toString());
+ offsetStorageConfig.setOffsetStorageType(connectorRuntimeConfig.getRuntimeConfig().get("offsetStoragePluginType").toString());
offsetStorageConfig.setDataSourceType(jobResponse.getTransportType().getSrc());
offsetStorageConfig.setDataSinkType(jobResponse.getTransportType().getDst());
+ Map offsetStorageExtensions = new HashMap<>();
+ offsetStorageExtensions.put("jobId", connectorRuntimeConfig.getJobID());
+ offsetStorageConfig.setExtensions(offsetStorageExtensions);
+
this.offsetManagementService = Optional.ofNullable(offsetStorageConfig).map(OffsetStorageConfig::getOffsetStorageType)
.map(storageType -> EventMeshExtensionFactory.getExtension(OffsetManagementService.class, storageType))
.orElse(new DefaultOffsetManagementServiceImpl());
@@ -225,6 +245,19 @@ private void initConnectorService() throws Exception {
this.offsetStorageWriter = new OffsetStorageWriterImpl(offsetManagementService);
this.offsetStorageReader = new OffsetStorageReaderImpl(offsetManagementService);
+ ConnectorCreateService> sourceConnectorCreateService =
+ ConnectorPluginFactory.createConnector(connectorRuntimeConfig.getSourceConnectorType() + "-Source");
+ sourceConnector = (Source) sourceConnectorCreateService.create();
+
+ SourceConfig sourceConfig = (SourceConfig) ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), sourceConnector.configClass());
+ SourceConnectorContext sourceConnectorContext = new SourceConnectorContext();
+ sourceConnectorContext.setSourceConfig(sourceConfig);
+ sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
+ sourceConnectorContext.setJobType(jobResponse.getType());
+ sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
+ if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
+ sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
+ }
sourceConnector.init(sourceConnectorContext);
ConnectorCreateService> sinkConnectorCreateService =
@@ -234,8 +267,12 @@ private void initConnectorService() throws Exception {
SinkConfig sinkConfig = (SinkConfig) ConfigUtil.parse(connectorRuntimeConfig.getSinkConnectorConfig(), sinkConnector.configClass());
SinkConnectorContext sinkConnectorContext = new SinkConnectorContext();
sinkConnectorContext.setSinkConfig(sinkConfig);
+ sinkConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
+ sinkConnectorContext.setJobType(jobResponse.getType());
sinkConnector.init(sinkConnectorContext);
+ reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.INIT);
+
}
private FetchJobResponse fetchJobConfig() {
@@ -282,6 +319,7 @@ public void start() throws Exception {
try {
startSinkConnector();
} catch (Exception e) {
+ isFailed = true;
log.error("sink connector [{}] start fail", sinkConnector.name(), e);
try {
this.stop();
@@ -296,6 +334,7 @@ public void start() throws Exception {
try {
startSourceConnector();
} catch (Exception e) {
+ isFailed = true;
log.error("source connector [{}] start fail", sourceConnector.name(), e);
try {
this.stop();
@@ -305,15 +344,25 @@ public void start() throws Exception {
throw new RuntimeException(e);
}
});
+
+ reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.RUNNING);
}
@Override
public void stop() throws Exception {
+ log.info("ConnectorRuntime start stop");
+ isRunning = false;
+ if (isFailed) {
+ reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.FAIL);
+ } else {
+ reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.COMPLETE);
+ }
sourceConnector.stop();
sinkConnector.stop();
sourceService.shutdown();
sinkService.shutdown();
heartBeatExecutor.shutdown();
+ reportVerifyExecutor.shutdown();
requestObserver.onCompleted();
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
@@ -327,50 +376,111 @@ private void startSourceConnector() throws Exception {
// TODO: use producer pub record to storage replace below
if (connectorRecordList != null && !connectorRecordList.isEmpty()) {
for (ConnectRecord record : connectorRecordList) {
+ // check recordUniqueId
+ if (record.getExtensions() == null || !record.getExtensions().containsKey("recordUniqueId")) {
+ record.addExtension("recordUniqueId", record.getRecordId());
+ }
+
+ queue.put(record);
+
// if enabled incremental data reporting consistency check
if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
reportVerifyRequest(record, connectorRuntimeConfig, ConnectorStage.SOURCE);
}
- queue.put(record);
- Optional submittedRecordPosition = prepareToUpdateRecordOffset(record);
- Optional callback =
- Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> (SendMessageCallback) v);
- // commit record
- this.sourceConnector.commit(record);
- submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
- // TODO:finish the optional callback
- // callback.ifPresent(cb -> cb.onSuccess(record));
- offsetManagement.awaitAllMessages(5000, TimeUnit.MILLISECONDS);
- // update & commit offset
- updateCommittableOffsets();
- commitOffsets();
+ // set a callback for this record
+ // if used the memory storage callback will be triggered after sink put success
+ record.setCallback(new SendMessageCallback() {
+ @Override
+ public void onSuccess(SendResult result) {
+ log.debug("send record to sink callback success, record: {}", record);
+ // commit record
+ sourceConnector.commit(record);
+ if (record.getPosition() != null) {
+ Optional submittedRecordPosition = prepareToUpdateRecordOffset(record);
+ submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
+ log.debug("start wait all messages to commit");
+ offsetManagement.awaitAllMessages(5000, TimeUnit.MILLISECONDS);
+ // update & commit offset
+ updateCommittableOffsets();
+ commitOffsets();
+ }
+ Optional callback =
+ Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> (SendMessageCallback) v);
+ callback.ifPresent(cb -> cb.onSuccess(convertToSendResult(record)));
+ }
+
+ @Override
+ public void onException(SendExceptionContext sendExceptionContext) {
+ isFailed = true;
+ // handle exception
+ sourceConnector.onException(record);
+ log.error("send record to sink callback exception, process will shut down, record: {}", record,
+ sendExceptionContext.getCause());
+ try {
+ stop();
+ } catch (Exception e) {
+ log.error("Failed to stop after exception", e);
+ }
+ }
+ });
}
}
}
}
+ private SendResult convertToSendResult(ConnectRecord record) {
+ SendResult result = new SendResult();
+ result.setMessageId(record.getRecordId());
+ if (StringUtils.isNotEmpty(record.getExtension("topic"))) {
+ result.setTopic(record.getExtension("topic"));
+ }
+ return result;
+ }
+
private void reportVerifyRequest(ConnectRecord record, ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) {
- UUID uuid = UUID.randomUUID();
- String recordId = uuid.toString();
- String md5Str = md5(record.toString());
- ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest();
- reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
- reportVerifyRequest.setRecordID(recordId);
- reportVerifyRequest.setRecordSig(md5Str);
- reportVerifyRequest.setConnectorName(
- IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
- reportVerifyRequest.setConnectorStage(connectorStage.name());
- reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));
-
- Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();
+ reportVerifyExecutor.submit(() -> {
+ try {
+ // use record data + recordUniqueId for md5
+ String md5Str = md5(record.getData().toString() + record.getExtension("recordUniqueId"));
+ ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest();
+ reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
+ reportVerifyRequest.setJobID(connectorRuntimeConfig.getJobID());
+ reportVerifyRequest.setRecordID(record.getRecordId());
+ reportVerifyRequest.setRecordSig(md5Str);
+ reportVerifyRequest.setConnectorName(
+ IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
+ reportVerifyRequest.setConnectorStage(connectorStage.name());
+ reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));
+
+ Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();
+
+ Payload request = Payload.newBuilder().setMetadata(metadata)
+ .setBody(
+ Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
+ .build())
+ .build();
+
+ requestObserver.onNext(request);
+ } catch (Exception e) {
+ log.error("Failed to report verify request", e);
+ }
+ });
+ }
- Payload request = Payload.newBuilder().setMetadata(metadata)
- .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
+ private void reportJobRequest(String jobId, JobState jobState) throws InterruptedException {
+ ReportJobRequest reportJobRequest = new ReportJobRequest();
+ reportJobRequest.setJobID(jobId);
+ reportJobRequest.setState(jobState);
+ Metadata metadata = Metadata.newBuilder()
+ .setType(ReportJobRequest.class.getSimpleName())
+ .build();
+ Payload payload = Payload.newBuilder()
+ .setMetadata(metadata)
+ .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest))))
.build())
.build();
-
- requestObserver.onNext(request);
+ requestObserver.onNext(payload);
}
private String md5(String input) {
diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
index 5a58cce08e..ab6fc3aaf5 100644
--- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
+++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
@@ -37,6 +37,8 @@ public class ConnectorRuntimeConfig {
private String region;
+ private Map runtimeConfig;
+
private String sourceConnectorType;
private String sourceConnectorDesc;
diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml b/eventmesh-runtime-v2/src/main/resources/connector.yaml
index bf7f58028b..3e407fa3e9 100644
--- a/eventmesh-runtime-v2/src/main/resources/connector.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml
@@ -15,6 +15,9 @@
# limitations under the License.
#
-taskID: 1
-jobID: 1
+taskID: 9c18a0d2-7a61-482c-8275-34f8c2786cea
+jobID: a01fd5e1-d295-4b89-99bc-0ae23eb85acf
region: region1
+runtimeConfig: # this used for connector runtime config
+ offsetStoragePluginType: admin
+ offsetStorageAddr: "127.0.0.1:8081;127.0.0.1:8081"
\ No newline at end of file
diff --git a/eventmesh-runtime-v2/src/main/resources/runtime.yaml b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
index 44c5f6f91f..9ac36f27b0 100644
--- a/eventmesh-runtime-v2/src/main/resources/runtime.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
@@ -16,7 +16,9 @@
#
componentType: CONNECTOR
+registryEnabled: false
registryServerAddr: 127.0.0.1:8085
registryPluginType: nacos
storagePluginType: memory
adminServiceName: eventmesh-admin
+adminServiceAddr: "127.0.0.1:8081;127.0.0.1:8081"
diff --git a/eventmesh-sdks/eventmesh-sdk-java/build.gradle b/eventmesh-sdks/eventmesh-sdk-java/build.gradle
index c59cadb068..d2f8c122c2 100644
--- a/eventmesh-sdks/eventmesh-sdk-java/build.gradle
+++ b/eventmesh-sdks/eventmesh-sdk-java/build.gradle
@@ -54,7 +54,7 @@ dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
- implementation "com.google.protobuf:protobuf-java-util:3.25.3"
+ implementation "com.google.protobuf:protobuf-java-util:3.25.4"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
diff --git a/eventmesh-storage-plugin/eventmesh-storage-redis/build.gradle b/eventmesh-storage-plugin/eventmesh-storage-redis/build.gradle
index 1ba2ac0c7b..7a195562b5 100644
--- a/eventmesh-storage-plugin/eventmesh-storage-redis/build.gradle
+++ b/eventmesh-storage-plugin/eventmesh-storage-redis/build.gradle
@@ -20,7 +20,7 @@ dependencies {
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
// redisson
- implementation 'org.redisson:redisson:3.32.0'
+ implementation 'org.redisson:redisson:3.35.0'
// netty
implementation 'io.netty:netty-all'
@@ -29,7 +29,7 @@ dependencies {
api 'io.cloudevents:cloudevents-json-jackson'
// test dependencies
- testImplementation 'com.github.fppt:jedis-mock:1.1.2'
+ testImplementation 'com.github.fppt:jedis-mock:1.1.3'
testImplementation "org.mockito:mockito-core"
compileOnly 'org.projectlombok:lombok'