Skip to content

Commit

Permalink
[ISSUE #4979] Canal Connector supports bidirectional data synchroniza…
Browse files Browse the repository at this point in the history
…tion (#5011)

* [ISSUE #4979]Canal Connector supports bidirectional data synchronization

* add bash files for admin & runtime-v2

* fix ack offset read & persist

* fix checkStyle error
  • Loading branch information
xwm1992 authored Jul 1, 2024
1 parent bfafbfb commit ae621d4
Show file tree
Hide file tree
Showing 24 changed files with 704 additions and 154 deletions.
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,13 @@ tasks.register('dist') {
["eventmesh-common",
"eventmesh-meta:eventmesh-meta-api",
"eventmesh-metrics-plugin:eventmesh-metrics-api",
"eventmesh-openconnect:eventmesh-openconnect-java",
"eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api",
"eventmesh-protocol-plugin:eventmesh-protocol-api",
"eventmesh-registry:eventmesh-registry-api",
"eventmesh-retry:eventmesh-retry-api",
"eventmesh-runtime",
"eventmesh-runtime-v2",
"eventmesh-security-plugin:eventmesh-security-api",
"eventmesh-spi",
"eventmesh-starter",
Expand Down Expand Up @@ -660,6 +664,7 @@ subprojects {

dependencyManagement {
dependencies {

dependency "org.apache.commons:commons-lang3:3.6"
dependency "org.apache.commons:commons-collections4:4.4"
dependency "org.apache.commons:commons-text:1.9"
Expand Down Expand Up @@ -709,7 +714,7 @@ subprojects {
dependency "com.mebigfatguy.fb-contrib:fb-contrib:7.6.0"
dependency "com.jayway.jsonpath:json-path:2.9.0"

dependency "org.springframework.boot:spring-boot-starter-web:2.7.18"
dependency "org.springframework.boot:spring-boot-starter-web:2.5.4"
dependency "io.openmessaging:registry-server:0.0.1"

dependency "org.junit.jupiter:junit-jupiter:5.6.0"
Expand Down
201 changes: 201 additions & 0 deletions eventmesh-admin-server/bin/start-admin.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#!/bin/bash
#
# Licensed to Apache Software Foundation (ASF) under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Apache Software Foundation (ASF) licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#===========================================================================================
# Java Environment Setting
#===========================================================================================
set -e
# Server configuration may be inconsistent, add these configurations to avoid garbled code problems
export LANG=en_US.UTF-8
export LC_CTYPE=en_US.UTF-8
export LC_ALL=en_US.UTF-8

TMP_JAVA_HOME="/customize/your/java/home/here"

# Detect operating system.
OS=$(uname)

function is_java8_or_11 {
local _java="$1"
[[ -x "$_java" ]] || return 1
[[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~ 'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]] || return 2
return 0
}

function extract_java_version {
local _java="$1"
local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print "11"; else print "unknown"}')
echo "$version"
}

# 0(not running), 1(is running)
#function is_proxyRunning {
# local _pid="$1"
# local pid=`ps ax | grep -i 'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep | awk '{print $1}'|grep $_pid`
# if [ -z "$pid" ] ; then
# return 0
# else
# return 1
# fi
#}

function get_pid {
local ppid=""
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
# If the process does not exist, it indicates that the previous process terminated abnormally.
if [ ! -d /proc/$ppid ]; then
# Remove the residual file.
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output."
ppid=""
fi
else
if [[ $OS =~ Msys ]]; then
# There is a Bug on Msys that may not be able to kill the identified process
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
elif [[ $OS =~ Darwin ]]; then
# Known problem: grep Java may not be able to accurately identify Java processes
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
else
if [ $DOCKER ]; then
# No need to exclude root user in Docker containers.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
else
# It is required to identify the process as accurately as possible on Linux.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'})
fi
fi
fi
echo "$ppid";
}

#===========================================================================================
# Locate Java Executable
#===========================================================================================

if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then
JAVA="$TMP_JAVA_HOME/bin/java"
JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java")
elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then
JAVA="$JAVA_HOME/bin/java"
JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java")
elif is_java8_or_11 "$(which java)"; then
JAVA="$(which java)"
JAVA_VERSION=$(extract_java_version "$(which java)")
else
echo -e "ERROR\t Java 8 or 11 not found, operation abort."
exit 9;
fi

echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA"

EVENTMESH_ADMIN_HOME=$(cd "$(dirname "$0")/.." && pwd)
export EVENTMESH_ADMIN_HOME

EVENTMESH_ADMIN_LOG_HOME="${EVENTMESH_ADMIN_HOME}/logs"
export EVENTMESH_ADMIN_LOG_HOME

echo -e "EVENTMESH_ADMIN_HOME : ${EVENTMESH_ADMIN_HOME}\nEVENTMESH_ADMIN_LOG_HOME : ${EVENTMESH_ADMIN_LOG_HOME}"

function make_logs_dir {
if [ ! -e "${EVENTMESH_ADMIN_LOG_HOME}" ]; then mkdir -p "${EVENTMESH_ADMIN_LOG_HOME}"; fi
}

error_exit ()
{
echo -e "ERROR\t $1 !!"
exit 1
}

export JAVA_HOME

#===========================================================================================
# JVM Configuration
#===========================================================================================
#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M -Xmn256m -XX:SurvivorRatio=4"
#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M -Xmn128m -XX:SurvivorRatio=4"
#fi

GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"

#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
JAVA_OPT="${JAVA_OPT} -verbose:gc"
if [[ "$JAVA_VERSION" == "8" ]]; then
# Set JAVA_OPT for Java 8
JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
elif [[ "$JAVA_VERSION" == "11" ]]; then
# Set JAVA_OPT for Java 11
XLOG_PARAM="time,level,tags:filecount=5,filesize=30m"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}"
JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM} -Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}"
fi
JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${EVENTMESH_ADMIN_LOG_HOME} -XX:ErrorFile=${EVENTMESH_ADMIN_LOG_HOME}/hs_err_%p.log"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced"
JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled"
JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom"
JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${EVENTMESH_ADMIN_HOME}/conf/log4j2.xml"
JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_ADMIN_LOG_HOME}"
JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_ADMIN_HOME}/conf"
JAVA_OPT="${JAVA_OPT} -DconfigurationPath=${EVENTMESH_ADMIN_HOME}/conf"
JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard"
JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"

#if [ -f "pid.file" ]; then
# pid=`cat pid.file`
# if ! is_proxyRunning "$pid"; then
# echo "proxy is running already"
# exit 9;
# else
# echo "err pid$pid, rm pid.file"
# rm pid.file
# fi
#fi

pid=$(get_pid)
if [[ $pid == "ERROR"* ]]; then
echo -e "${pid}"
exit 9
fi
if [ -n "$pid" ]; then
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
exit 9
fi

make_logs_dir

echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out

EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
if [ $DOCKER ]; then
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
else
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
fi
exit 0
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.common.remote.job.JobTransportType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.List;
import java.util.Map;

import lombok.Data;
Expand All @@ -42,7 +43,7 @@ public class EventMeshJobDetail {

private String sinkConnectorDesc;

private RecordPosition position;
private List<RecordPosition> position;

private JobState state;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -38,7 +40,7 @@ public class EventMeshPositionBizService {
PositionHandlerFactory factory;

// called isValidateReportRequest before call this
public RecordPosition getPosition(FetchPositionRequest request, Metadata metadata) {
public List<RecordPosition> getPosition(FetchPositionRequest request, Metadata metadata) {
if (request == null) {
return null;
}
Expand Down Expand Up @@ -68,7 +70,7 @@ public boolean reportPosition(ReportPositionRequest request, Metadata metadata)
return handler.handler(request, metadata);
}

public RecordPosition getPositionByJobID(Integer jobID, DataSourceType type) {
public List<RecordPosition> getPositionByJobID(Integer jobID, DataSourceType type) {
if (jobID == null || type == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;

import java.util.List;

/**
* IFetchPositionHandler
*/
public interface IFetchPositionHandler {

RecordPosition handler(FetchPositionRequest request, Metadata metadata);
List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
import org.apache.eventmesh.common.utils.JsonUtils;

import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -142,20 +143,21 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) {
}

@Override
public RecordPosition handler(FetchPositionRequest request, Metadata metadata) {
EventMeshMysqlPosition position = positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
public List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata) {
List<EventMeshMysqlPosition> positionList = positionService.list(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
request.getJobID()));
RecordPosition recordPosition = null;
if (position != null) {
List<RecordPosition> recordPositionList = new ArrayList<>();
for (EventMeshMysqlPosition position : positionList) {
RecordPosition recordPosition = new RecordPosition();
CanalRecordPartition partition = new CanalRecordPartition();
partition.setTimeStamp(position.getTimestamp());
partition.setJournalName(position.getJournalName());
recordPosition.setRecordPartition(partition);
CanalRecordOffset offset = new CanalRecordOffset();
offset.setOffset(position.getPosition());
recordPosition = new RecordPosition();
recordPosition.setRecordPartition(partition);
recordPosition.setRecordOffset(offset);
recordPositionList.add(recordPosition);
}
return recordPosition;
return recordPositionList;
}
}
4 changes: 2 additions & 2 deletions eventmesh-admin-server/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
spring:
datasource:
url: jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: mike920830
username: //db_username
password: //db_password
driver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,20 @@
@EqualsAndHashCode(callSuper = true)
public class CanalSinkConfig extends SinkConfig {

private Integer batchSize = 50; // batchSize
// batchSize
private Integer batchSize = 50;

private Boolean useBatch = true; // enable batch
// enable batch
private Boolean useBatch = true;

private Integer poolSize = 5; // sink thread size for single channel
// sink thread size for single channel
private Integer poolSize = 5;

private SyncMode syncMode; // sync mode: field/row
// sync mode: field/row
private SyncMode syncMode;

private Boolean skipException = false; // skip sink process exception
// skip sink process exception
private Boolean skipException = false;

public SinkConnectorConfig sinkConnectorConfig;

Expand Down
Loading

0 comments on commit ae621d4

Please sign in to comment.