Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4979] Canal Connector supports bidirectional data synchronization #5011

Merged
merged 4 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,13 @@ tasks.register('dist') {
["eventmesh-common",
"eventmesh-meta:eventmesh-meta-api",
"eventmesh-metrics-plugin:eventmesh-metrics-api",
"eventmesh-openconnect:eventmesh-openconnect-java",
"eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api",
"eventmesh-protocol-plugin:eventmesh-protocol-api",
"eventmesh-registry:eventmesh-registry-api",
"eventmesh-retry:eventmesh-retry-api",
"eventmesh-runtime",
"eventmesh-runtime-v2",
"eventmesh-security-plugin:eventmesh-security-api",
"eventmesh-spi",
"eventmesh-starter",
Expand Down Expand Up @@ -660,6 +664,7 @@ subprojects {

dependencyManagement {
dependencies {

dependency "org.apache.commons:commons-lang3:3.6"
dependency "org.apache.commons:commons-collections4:4.4"
dependency "org.apache.commons:commons-text:1.9"
Expand Down Expand Up @@ -709,7 +714,7 @@ subprojects {
dependency "com.mebigfatguy.fb-contrib:fb-contrib:7.6.0"
dependency "com.jayway.jsonpath:json-path:2.9.0"

dependency "org.springframework.boot:spring-boot-starter-web:2.7.18"
dependency "org.springframework.boot:spring-boot-starter-web:2.5.4"
dependency "io.openmessaging:registry-server:0.0.1"

dependency "org.junit.jupiter:junit-jupiter:5.6.0"
Expand Down
201 changes: 201 additions & 0 deletions eventmesh-admin-server/bin/start-admin.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#!/bin/bash
#
# Licensed to Apache Software Foundation (ASF) under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Apache Software Foundation (ASF) licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#===========================================================================================
# Java Environment Setting
#===========================================================================================
set -e
# Server configuration may be inconsistent, add these configurations to avoid garbled code problems
export LANG=en_US.UTF-8
export LC_CTYPE=en_US.UTF-8
export LC_ALL=en_US.UTF-8

TMP_JAVA_HOME="/customize/your/java/home/here"

# Detect operating system.
OS=$(uname)

function is_java8_or_11 {
local _java="$1"
[[ -x "$_java" ]] || return 1
[[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~ 'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]] || return 2
return 0
}

function extract_java_version {
local _java="$1"
local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print "11"; else print "unknown"}')
echo "$version"
}

# 0(not running), 1(is running)
#function is_proxyRunning {
# local _pid="$1"
# local pid=`ps ax | grep -i 'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep | awk '{print $1}'|grep $_pid`
# if [ -z "$pid" ] ; then
# return 0
# else
# return 1
# fi
#}

function get_pid {
local ppid=""
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
# If the process does not exist, it indicates that the previous process terminated abnormally.
if [ ! -d /proc/$ppid ]; then
# Remove the residual file.
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output."
ppid=""
fi
else
if [[ $OS =~ Msys ]]; then
# There is a Bug on Msys that may not be able to kill the identified process
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
elif [[ $OS =~ Darwin ]]; then
# Known problem: grep Java may not be able to accurately identify Java processes
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
else
if [ $DOCKER ]; then
# No need to exclude root user in Docker containers.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
else
# It is required to identify the process as accurately as possible on Linux.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'})
fi
fi
fi
echo "$ppid";
}

#===========================================================================================
# Locate Java Executable
#===========================================================================================

if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then
JAVA="$TMP_JAVA_HOME/bin/java"
JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java")
elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then
JAVA="$JAVA_HOME/bin/java"
JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java")
elif is_java8_or_11 "$(which java)"; then
JAVA="$(which java)"
JAVA_VERSION=$(extract_java_version "$(which java)")
else
echo -e "ERROR\t Java 8 or 11 not found, operation abort."
exit 9;
fi

echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA"

EVENTMESH_ADMIN_HOME=$(cd "$(dirname "$0")/.." && pwd)
export EVENTMESH_ADMIN_HOME

EVENTMESH_ADMIN_LOG_HOME="${EVENTMESH_ADMIN_HOME}/logs"
export EVENTMESH_ADMIN_LOG_HOME

echo -e "EVENTMESH_ADMIN_HOME : ${EVENTMESH_ADMIN_HOME}\nEVENTMESH_ADMIN_LOG_HOME : ${EVENTMESH_ADMIN_LOG_HOME}"

function make_logs_dir {
if [ ! -e "${EVENTMESH_ADMIN_LOG_HOME}" ]; then mkdir -p "${EVENTMESH_ADMIN_LOG_HOME}"; fi
}

error_exit ()
{
echo -e "ERROR\t $1 !!"
exit 1
}

export JAVA_HOME

#===========================================================================================
# JVM Configuration
#===========================================================================================
#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M -Xmn256m -XX:SurvivorRatio=4"
#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M -Xmn128m -XX:SurvivorRatio=4"
#fi

GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"

#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
JAVA_OPT="${JAVA_OPT} -verbose:gc"
if [[ "$JAVA_VERSION" == "8" ]]; then
# Set JAVA_OPT for Java 8
JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
elif [[ "$JAVA_VERSION" == "11" ]]; then
# Set JAVA_OPT for Java 11
XLOG_PARAM="time,level,tags:filecount=5,filesize=30m"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}"
JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM} -Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}"
fi
JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${EVENTMESH_ADMIN_LOG_HOME} -XX:ErrorFile=${EVENTMESH_ADMIN_LOG_HOME}/hs_err_%p.log"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced"
JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled"
JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom"
JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${EVENTMESH_ADMIN_HOME}/conf/log4j2.xml"
JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_ADMIN_LOG_HOME}"
JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_ADMIN_HOME}/conf"
JAVA_OPT="${JAVA_OPT} -DconfigurationPath=${EVENTMESH_ADMIN_HOME}/conf"
JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard"
JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"

#if [ -f "pid.file" ]; then
# pid=`cat pid.file`
# if ! is_proxyRunning "$pid"; then
# echo "proxy is running already"
# exit 9;
# else
# echo "err pid$pid, rm pid.file"
# rm pid.file
# fi
#fi

pid=$(get_pid)
if [[ $pid == "ERROR"* ]]; then
echo -e "${pid}"
exit 9
fi
if [ -n "$pid" ]; then
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
exit 9
fi

make_logs_dir

echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out

EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
if [ $DOCKER ]; then
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
else
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
fi
exit 0
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.common.remote.job.JobTransportType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.List;
import java.util.Map;

import lombok.Data;
Expand All @@ -42,7 +43,7 @@ public class EventMeshJobDetail {

private String sinkConnectorDesc;

private RecordPosition position;
private List<RecordPosition> position;

private JobState state;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -38,7 +40,7 @@ public class EventMeshPositionBizService {
PositionHandlerFactory factory;

// called isValidateReportRequest before call this
public RecordPosition getPosition(FetchPositionRequest request, Metadata metadata) {
public List<RecordPosition> getPosition(FetchPositionRequest request, Metadata metadata) {
if (request == null) {
return null;
}
Expand Down Expand Up @@ -68,7 +70,7 @@ public boolean reportPosition(ReportPositionRequest request, Metadata metadata)
return handler.handler(request, metadata);
}

public RecordPosition getPositionByJobID(Integer jobID, DataSourceType type) {
public List<RecordPosition> getPositionByJobID(Integer jobID, DataSourceType type) {
if (jobID == null || type == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;

import java.util.List;

/**
* IFetchPositionHandler
*/
public interface IFetchPositionHandler {

RecordPosition handler(FetchPositionRequest request, Metadata metadata);
List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
import org.apache.eventmesh.common.utils.JsonUtils;

import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -142,20 +143,21 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) {
}

@Override
public RecordPosition handler(FetchPositionRequest request, Metadata metadata) {
EventMeshMysqlPosition position = positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
public List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata) {
List<EventMeshMysqlPosition> positionList = positionService.list(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
request.getJobID()));
RecordPosition recordPosition = null;
if (position != null) {
List<RecordPosition> recordPositionList = new ArrayList<>();
for (EventMeshMysqlPosition position : positionList) {
RecordPosition recordPosition = new RecordPosition();
CanalRecordPartition partition = new CanalRecordPartition();
partition.setTimeStamp(position.getTimestamp());
partition.setJournalName(position.getJournalName());
recordPosition.setRecordPartition(partition);
CanalRecordOffset offset = new CanalRecordOffset();
offset.setOffset(position.getPosition());
recordPosition = new RecordPosition();
recordPosition.setRecordPartition(partition);
recordPosition.setRecordOffset(offset);
recordPositionList.add(recordPosition);
}
return recordPosition;
return recordPositionList;
}
}
4 changes: 2 additions & 2 deletions eventmesh-admin-server/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
spring:
datasource:
url: jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: mike920830
username: //db_username
password: //db_password
driver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,20 @@
@EqualsAndHashCode(callSuper = true)
public class CanalSinkConfig extends SinkConfig {

private Integer batchSize = 50; // batchSize
// batchSize
private Integer batchSize = 50;

Check warning on line 31 in eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java#L31

Added line #L31 was not covered by tests

private Boolean useBatch = true; // enable batch
// enable batch
private Boolean useBatch = true;

Check warning on line 34 in eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java#L34

Added line #L34 was not covered by tests

private Integer poolSize = 5; // sink thread size for single channel
// sink thread size for single channel
private Integer poolSize = 5;

Check warning on line 37 in eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java#L37

Added line #L37 was not covered by tests

private SyncMode syncMode; // sync mode: field/row
// sync mode: field/row
private SyncMode syncMode;

Check warning on line 40 in eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java#L40

Added line #L40 was not covered by tests

private Boolean skipException = false; // skip sink process exception
// skip sink process exception
private Boolean skipException = false;

Check warning on line 43 in eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java#L43

Added line #L43 was not covered by tests

public SinkConnectorConfig sinkConnectorConfig;

Expand Down
Loading
Loading