From f09faa05fda206a66aa21b772ab6b658c97835e9 Mon Sep 17 00:00:00 2001 From: mike_xwm Date: Mon, 9 Dec 2024 17:51:38 +0800 Subject: [PATCH] [ISSUE #5139] update canal connector module (#5140) * [ISSUE #5137] update connector runtime v2 module * fix checkStyle error * [ISSUE #5139] update canal connector module --- .../rdb/canal/CanalSinkFullConfig.java | 1 + .../rdb/canal/CanalSinkIncrementConfig.java | 4 +- .../rdb/canal/CanalSourceCheckConfig.java | 38 ++ .../rdb/canal/CanalSourceFullConfig.java | 6 +- .../rdb/canal/CanalSourceIncrementConfig.java | 16 +- .../rdb/canal/JobRdbFullPosition.java | 1 + .../rdb/canal/mysql/MySQLTableDef.java | 4 +- .../datasource/DataSourceDriverType.java | 1 + .../remote/datasource/DataSourceType.java | 1 + .../eventmesh/connector/canal/SqlUtils.java | 4 +- .../SqlBuilderLoadInterceptor.java | 25 +- .../sink/connector/CanalCheckConsumer.java | 540 ++++++++++++++++++ .../sink/connector/CanalFullConsumer.java | 391 +++++++++++++ .../connector/CanalSinkCheckConnector.java | 341 ++--------- .../connector/CanalSinkFullConnector.java | 362 ++---------- .../CanalSinkIncrementConnector.java | 2 +- .../connector/canal/source/EntryParser.java | 19 +- .../source/connector/CanalFullProducer.java | 100 +++- .../connector/CanalSourceCheckConnector.java | 126 ++-- .../connector/CanalSourceFullConnector.java | 22 +- .../CanalSourceIncrementConnector.java | 115 +++- .../position/CanalCheckPositionMgr.java | 250 ++++++++ .../source/position/CanalFullPositionMgr.java | 8 +- .../canal/source/table/RdbTableMgr.java | 82 ++- 24 files changed, 1673 insertions(+), 786 deletions(-) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceCheckConfig.java create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalCheckConsumer.java create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalFullConsumer.java create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalCheckPositionMgr.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java index f1d78a65dc..dca16b100c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java @@ -28,4 +28,5 @@ public class CanalSinkFullConfig extends SinkConfig { private SinkConnectorConfig sinkConnectorConfig; private String zeroDate; + private int parallel = 2; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkIncrementConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkIncrementConfig.java index 32112a769b..aeb9d5a0e2 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkIncrementConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkIncrementConfig.java @@ -36,9 +36,9 @@ public class CanalSinkIncrementConfig extends CanalSinkConfig { private Integer poolSize = 5; // sync mode: field/row - private SyncMode syncMode; + private SyncMode syncMode = SyncMode.ROW; - private boolean isGTIDMode = true; + private boolean isGTIDMode = false; private boolean isMariaDB = true; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceCheckConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceCheckConfig.java new file mode 100644 index 0000000000..f326301d7d --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceCheckConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal; + +import org.apache.eventmesh.common.config.connector.SourceConfig; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import java.util.List; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class CanalSourceCheckConfig extends SourceConfig { + private SourceConnectorConfig sourceConnectorConfig; + private List startPosition; + private int parallel; + private int flushSize; + private int executePeriod = 3600; + private Integer pagePerSecond = 1; + private Integer recordPerSecond = 100; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java index 15398b303a..53988ca055 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java @@ -30,6 +30,8 @@ public class CanalSourceFullConfig extends SourceConfig { private SourceConnectorConfig sourceConnectorConfig; private List startPosition; - private int parallel; - private int flushSize; + private int parallel = 2; + private int flushSize = 20; + private Integer pagePerSecond = 1; + private Integer recordPerSecond = 100; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceIncrementConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceIncrementConfig.java index 94fe007b5f..7f73727140 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceIncrementConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceIncrementConfig.java @@ -32,17 +32,17 @@ public class CanalSourceIncrementConfig extends CanalSourceConfig { private String destination; - private Long canalInstanceId; + private Long canalInstanceId = 1L; - private String desc; + private String desc = "canalSourceInstance"; - private boolean ddlSync = true; + private boolean ddlSync = false; private boolean filterTableError = false; private Long slaveId; - private Short clientId; + private Short clientId = 1; private String serverUUID; @@ -67,19 +67,19 @@ public class CanalSourceIncrementConfig extends CanalSourceConfig { private Boolean enableRemedy = false; // sync mode: field/row - private SyncMode syncMode; + private SyncMode syncMode = SyncMode.ROW; // sync consistency - private SyncConsistency syncConsistency; + private SyncConsistency syncConsistency = SyncConsistency.BASE; // ================================= system parameter // ================================ // Column name of the bidirectional synchronization mark - private String needSyncMarkTableColumnName = "needSync"; + private String needSyncMarkTableColumnName; // Column value of the bidirectional synchronization mark - private String needSyncMarkTableColumnValue = "needSync"; + private String needSyncMarkTableColumnValue; private SourceConnectorConfig sourceConnectorConfig; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java index 08f88e1d24..42ba889bbd 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java @@ -30,6 +30,7 @@ public class JobRdbFullPosition { private String tableName; private String primaryKeyRecords; private long maxCount; + private long handledRecordCount = 0; private boolean finished; private BigDecimal percent; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java index cdd3652378..4266a96060 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java @@ -19,8 +19,8 @@ import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; +import java.util.List; import java.util.Map; -import java.util.Set; import lombok.Data; import lombok.EqualsAndHashCode; @@ -31,6 +31,6 @@ @Data @EqualsAndHashCode(callSuper = true) public class MySQLTableDef extends RdbTableDefinition { - private Set primaryKeys; + private List primaryKeys; private Map columnDefinitions; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java index 4429bee5a9..f1c0f54e5f 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java @@ -19,6 +19,7 @@ public enum DataSourceDriverType { MYSQL, + MariaDB, REDIS, ROCKETMQ, HTTP; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java index 8c40971e7b..1c14239c3b 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java @@ -27,6 +27,7 @@ @ToString public enum DataSourceType { MYSQL("MySQL", DataSourceDriverType.MYSQL, DataSourceClassify.RDB), + MariaDB("MariaDB", DataSourceDriverType.MariaDB, DataSourceClassify.RDB), REDIS("Redis", DataSourceDriverType.REDIS, DataSourceClassify.CACHE), ROCKETMQ("RocketMQ", DataSourceDriverType.ROCKETMQ, DataSourceClassify.MQ), HTTP("HTTP", DataSourceDriverType.HTTP, DataSourceClassify.TUNNEL); diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java index 1008ad1cf3..273f5cde4c 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java @@ -780,8 +780,8 @@ public static LocalDateTime toLocalDateTime(Object value) { long nanos = ((Timestamp) value).getNanos(); return Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault()) .toLocalDateTime(); - } else if (value instanceof java.sql.Date) { - return ((java.sql.Date) value).toLocalDate().atTime(0, 0); + } else if (value instanceof Date) { + return ((Date) value).toLocalDate().atTime(0, 0); } else { if (!(value instanceof Time)) { return ((java.util.Date) value).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java index 7d83bd4f3f..1d7bd35b94 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java @@ -64,6 +64,7 @@ public boolean before(CanalSinkIncrementConfig sinkConfig, CanalConnectRecord re String[] keyColumns = null; String[] otherColumns = null; if (existOldKeys) { + // update table xxx set pk = newPK where pk = oldPk keyColumns = buildColumnNames(record.getOldKeys()); otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys()); } else { @@ -71,17 +72,19 @@ public boolean before(CanalSinkIncrementConfig sinkConfig, CanalConnectRecord re otherColumns = buildColumnNames(record.getUpdatedColumns()); } - if (rowMode && !existOldKeys) { - sql = sqlTemplate.getMergeSql(schemaName, - record.getTableName(), - keyColumns, - otherColumns, - new String[] {}, - true, - shardColumns); - } else { - sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns); - } + // not support the column default not null for merge sql + // if (rowMode && !existOldKeys) { + // sql = sqlTemplate.getMergeSql(schemaName, + // record.getTableName(), + // keyColumns, + // otherColumns, + // new String[] {}, + // true, + // shardColumns); + // } else { + // sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns); + // } + sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns); } else if (type.isDelete()) { sql = sqlTemplate.getDeleteSql(schemaName, record.getTableName(), diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalCheckConsumer.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalCheckConsumer.java new file mode 100644 index 0000000000..fb9a33b49f --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalCheckConsumer.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.sink.connector; + +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalMySQLType; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkFullConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.canal.DatabaseConnection; +import org.apache.eventmesh.connector.canal.SqlUtils; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.lang3.StringUtils; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.text.MessageFormat; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +import com.alibaba.druid.pool.DruidPooledConnection; +import com.fasterxml.jackson.core.type.TypeReference; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class CanalCheckConsumer { + private BlockingQueue> queue; + private RdbTableMgr tableMgr; + private CanalSinkFullConfig config; + private final DateTimeFormatter dataTimePattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"); + + + public CanalCheckConsumer(BlockingQueue> queue, RdbTableMgr tableMgr, CanalSinkFullConfig config) { + this.config = config; + this.queue = queue; + this.tableMgr = tableMgr; + } + + + public void start(AtomicBoolean flag) { + while (flag.get()) { + List sinkRecords = null; + try { + sinkRecords = queue.poll(2, TimeUnit.SECONDS); + if (sinkRecords == null || sinkRecords.isEmpty()) { + continue; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + ConnectRecord record = sinkRecords.get(0); + Map dataMap = + JsonUtils.parseTypeReferenceObject((byte[]) record.getData(), new TypeReference>() { + }); + + List> sourceRows = JsonUtils.parseObject(dataMap.get("data").toString(), List.class); + + if (sourceRows == null || sourceRows.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("[{}] got rows data is none", this.getClass()); + } + return; + } + CanalFullRecordOffset offset = JsonUtils.parseObject(dataMap.get("offset").toString(), CanalFullRecordOffset.class); + if (offset == null || offset.getPosition() == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] got canal full offset is none", this.getClass()); + } + return; + } + + MySQLTableDef tableDefinition = (MySQLTableDef) tableMgr.getTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); + if (tableDefinition == null) { + log.warn("target schema [{}] table [{}] is not exists", offset.getPosition().getSchema(), offset.getPosition().getTableName()); + return; + } + + String sql = genTargetPkInSql(tableDefinition, sourceRows.size(), Constants.MySQLQuot, Constants.MySQLQuot, "*"); + DruidPooledConnection connection = null; + PreparedStatement statement = null; + try { + connection = DatabaseConnection.sinkDataSource.getConnection(); + statement = + connection.prepareStatement(sql); + setPrepareParams(statement, sourceRows, tableDefinition); + log.debug("select sql {}", statement.toString()); + ResultSet resultSet = statement.executeQuery(); + List> targetRows = new LinkedList<>(); + while (resultSet.next()) { + Map columnValues = new LinkedHashMap<>(); + for (Map.Entry col : + tableDefinition.getColumnDefinitions().entrySet()) { + columnValues.put(col.getKey(), readColumn(resultSet, col.getKey(), + col.getValue().getType())); + } + targetRows.add(columnValues); + } + compareData(sourceRows, targetRows, tableDefinition); + record.getCallback().onSuccess(convertToSendResult(record)); + } catch (SQLException e) { + log.warn("check sink process schema [{}] table [{}] connector check fail", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), + e); + LockSupport.parkNanos(3000 * 1000L); + record.getCallback().onException(buildSendExceptionContext(record, e)); + } catch (Exception e) { + log.error("check sink process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + record.getCallback().onException(buildSendExceptionContext(record, e)); + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + log.error("close prepare statement fail", e); + } + } + + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + log.error("close db connection fail", e); + } + } + } + } + } + + private void compareData(List> sourceRows, List> targetRows, MySQLTableDef tableDefinition) { + List> differenceSource = new ArrayList<>(sourceRows); + List> differenceTarget = new ArrayList<>(targetRows); + // Find common elements and remove from difference lists + for (Map source : sourceRows) { + for (Map target : targetRows) { + if (source.equals(target)) { + differenceSource.remove(source); + differenceTarget.remove(target); + break; + } + } + } + if (!differenceSource.isEmpty()) { + log.error("source rows is not equals target rows, source rows are [{}]", differenceSource); + } + + if (!differenceTarget.isEmpty()) { + log.error("source rows is not equals target rows, target rows are [{}]", differenceTarget); + } + } + + private void setPrepareParams(PreparedStatement preparedStatement, List> rows, MySQLTableDef tableDef) throws Exception { + List cols = new ArrayList<>(tableDef.getColumnDefinitions().values()); + int index = 0; + for (Map col : rows) { + for (MySQLColumnDef mySQLColumnDef : cols) { + if (tableDef.getPrimaryKeys().contains(mySQLColumnDef.getName())) { + index++; + writeColumn(preparedStatement, index, mySQLColumnDef, col.get(mySQLColumnDef.getName())); + } + } + } + } + + public Object readColumn(ResultSet rs, String colName, CanalMySQLType colType) throws Exception { + switch (colType) { + case TINYINT: + case SMALLINT: + case MEDIUMINT: + case INT: + Long valueLong = rs.getLong(colName); + if (rs.wasNull()) { + return null; + } + if (valueLong.compareTo((long) Integer.MAX_VALUE) > 0) { + return valueLong; + } + return valueLong.intValue(); + case BIGINT: + String v = rs.getString(colName); + if (v == null) { + return null; + } + BigDecimal valueBigInt = new BigDecimal(v); + if (valueBigInt.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) { + return valueBigInt; + } + return valueBigInt.longValue(); + case FLOAT: + case DOUBLE: + case DECIMAL: + return rs.getBigDecimal(colName); + case DATE: + return rs.getObject(colName, LocalDate.class).toString(); + case TIME: + return rs.getObject(colName, LocalTime.class).toString(); + case DATETIME: + case TIMESTAMP: + return rs.getObject(colName, LocalDateTime.class).toString(); + case YEAR: + int year = rs.getInt(colName); + if (rs.wasNull()) { + return null; + } + return year; + case CHAR: + case VARCHAR: + case TINYTEXT: + case TEXT: + case MEDIUMTEXT: + case LONGTEXT: + case ENUM: + case SET: + case JSON: + return rs.getString(colName); + case BIT: + case BINARY: + case VARBINARY: + case TINYBLOB: + case BLOB: + case MEDIUMBLOB: + case LONGBLOB: + return rs.getBytes(colName); + case GEOMETRY: + case GEOMETRY_COLLECTION: + case GEOM_COLLECTION: + case POINT: + case LINESTRING: + case POLYGON: + case MULTIPOINT: + case MULTILINESTRING: + case MULTIPOLYGON: + byte[] geo = rs.getBytes(colName); + if (geo == null) { + return null; + } + return SqlUtils.toGeometry(geo); + default: + return rs.getObject(colName); + } + } + + public void writeColumn(PreparedStatement ps, int index, MySQLColumnDef colType, Object value) throws Exception { + if (colType == null) { + String colVal = null; + if (value != null) { + colVal = value.toString(); + } + if (colVal == null) { + ps.setNull(index, Types.VARCHAR); + } else { + ps.setString(index, colVal); + } + } else if (value == null) { + ps.setNull(index, colType.getJdbcType().getVendorTypeNumber()); + } else { + switch (colType.getType()) { + case TINYINT: + case SMALLINT: + case MEDIUMINT: + case INT: + Long longValue = SqlUtils.toLong(value); + if (longValue == null) { + ps.setNull(index, 4); + return; + } else { + ps.setLong(index, longValue); + return; + } + case BIGINT: + case DECIMAL: + BigDecimal bigDecimalValue = SqlUtils.toBigDecimal(value); + if (bigDecimalValue == null) { + ps.setNull(index, 3); + return; + } else { + ps.setBigDecimal(index, bigDecimalValue); + return; + } + case FLOAT: + case DOUBLE: + Double doubleValue = SqlUtils.toDouble(value); + if (doubleValue == null) { + ps.setNull(index, 8); + } else { + ps.setDouble(index, doubleValue); + } + return; + case DATE: + case DATETIME: + case TIMESTAMP: + LocalDateTime dateValue = null; + if (!SqlUtils.isZeroTime(value)) { + try { + dateValue = SqlUtils.toLocalDateTime(value); + } catch (Exception e) { + ps.setString(index, SqlUtils.convertToString(value)); + return; + } + } else if (StringUtils.isNotBlank(config.getZeroDate())) { + dateValue = SqlUtils.toLocalDateTime(config.getZeroDate()); + } else { + ps.setObject(index, value); + return; + } + if (dateValue == null) { + ps.setNull(index, Types.TIMESTAMP); + } else { + ps.setString(index, dataTimePattern.format(dateValue)); + } + return; + case TIME: + String timeValue = SqlUtils.toMySqlTime(value); + if (StringUtils.isBlank(timeValue)) { + ps.setNull(index, 12); + return; + } else { + ps.setString(index, timeValue); + return; + } + case YEAR: + LocalDateTime yearValue = null; + if (!SqlUtils.isZeroTime(value)) { + yearValue = SqlUtils.toLocalDateTime(value); + } else if (StringUtils.isNotBlank(config.getZeroDate())) { + yearValue = SqlUtils.toLocalDateTime(config.getZeroDate()); + } else { + ps.setInt(index, 0); + return; + } + if (yearValue == null) { + ps.setNull(index, 4); + } else { + ps.setInt(index, yearValue.getYear()); + } + return; + case CHAR: + case VARCHAR: + case TINYTEXT: + case TEXT: + case MEDIUMTEXT: + case LONGTEXT: + case ENUM: + case SET: + String strValue = value.toString(); + if (strValue == null) { + ps.setNull(index, Types.VARCHAR); + return; + } else { + ps.setString(index, strValue); + return; + } + case JSON: + String jsonValue = value.toString(); + if (jsonValue == null) { + ps.setNull(index, Types.VARCHAR); + } else { + ps.setString(index, jsonValue); + } + return; + case BIT: + if (value instanceof Boolean) { + byte[] arrayBoolean = new byte[1]; + arrayBoolean[0] = (byte) (Boolean.TRUE.equals(value) ? 1 : 0); + ps.setBytes(index, arrayBoolean); + return; + } else if (value instanceof Number) { + ps.setBytes(index, SqlUtils.numberToBinaryArray((Number) value)); + return; + } else if ((value instanceof byte[]) || value.toString().startsWith("0x") || value.toString().startsWith("0X")) { + byte[] arrayBoolean = SqlUtils.toBytes(value); + if (arrayBoolean == null || arrayBoolean.length == 0) { + ps.setNull(index, Types.BIT); + return; + } else { + ps.setBytes(index, arrayBoolean); + return; + } + } else { + ps.setBytes(index, SqlUtils.numberToBinaryArray(SqlUtils.toInt(value))); + return; + } + case BINARY: + case VARBINARY: + case TINYBLOB: + case BLOB: + case MEDIUMBLOB: + case LONGBLOB: + byte[] binaryValue = SqlUtils.toBytes(value); + if (binaryValue == null) { + ps.setNull(index, Types.BINARY); + return; + } else { + ps.setBytes(index, binaryValue); + return; + } + case GEOMETRY: + case GEOMETRY_COLLECTION: + case GEOM_COLLECTION: + case POINT: + case LINESTRING: + case POLYGON: + case MULTIPOINT: + case MULTILINESTRING: + case MULTIPOLYGON: + String geoValue = SqlUtils.toGeometry(value); + if (geoValue == null) { + ps.setNull(index, Types.VARCHAR); + return; + } + ps.setString(index, geoValue); + return; + default: + throw new UnsupportedOperationException("columnType '" + colType + "' Unsupported."); + } + } + } + + public String genTargetPkInSql(MySQLTableDef def, int pkGroupSize, String leftQuote, String rightQuote, String selectEleStr) { + List pkCols = def.getPrimaryKeys(); + if (pkCols == null || pkCols.isEmpty()) { + throw new IllegalArgumentException("unsupported pk is empty table check."); + } else if (pkCols.size() == 1) { + return genSinglePkInSql(def, pkGroupSize, leftQuote, rightQuote, selectEleStr); + } else { + return genMultiPkInSql(def, pkGroupSize, leftQuote, rightQuote, selectEleStr); + } + } + + public String genSinglePkInSql(MySQLTableDef def, int pkGroupSize, String leftQuote, String rightQuote, String selectEleStr) { + return MessageFormat.format(genFetchSqlFormat(leftQuote, rightQuote, selectEleStr), def.getSchemaName(), def.getTableName(), + leftQuote + def.getPrimaryKeys().get(0) + rightQuote, genSinglePkPlaceHolderStr(pkGroupSize)); + } + + public String genMultiPkInSql(MySQLTableDef def, int pkGroupSize, String leftQuote, String rightQuote, String selectEleStr) { + String fetchSqlFormat = genFetchSqlFormat(leftQuote, rightQuote, selectEleStr); + List pkCols = def.getPrimaryKeys(); + StringBuilder pksBuilder = new StringBuilder("("); + for (int i = 0; i < pkCols.size(); i++) { + if (i != 0) { + pksBuilder.append(","); + } + pksBuilder.append(leftQuote).append(pkCols.get(i)).append(rightQuote); + } + pksBuilder.append(")"); + return MessageFormat.format(fetchSqlFormat, def.getSchemaName(), def.getTableName(), pksBuilder.toString(), + genMultiPkPlaceHolderStr(pkGroupSize, pkCols.size())); + } + + public String genFetchSqlFormat(String leftQuote, String rightQuote, String selectEleStr) { + return "select " + selectEleStr + " from " + leftQuote + "{0}" + rightQuote + "." + leftQuote + "{1}" + rightQuote + " where {2} in ({3})"; + } + + public String genSinglePkPlaceHolderStr(int valueSize) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < valueSize; i++) { + if (i != 0) { + sb.append(","); + } + sb.append("?"); + } + return sb.toString(); + } + + public String genMultiPkPlaceHolderStr(int valueSize, int sizePerGroup) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < valueSize; i++) { + if (i != 0) { + sb.append(","); + } + sb.append("("); + for (int j = 0; j < sizePerGroup; j++) { + if (j != 0) { + sb.append(","); + } + sb.append("?"); + } + sb.append(")"); + } + return sb.toString(); + } + + + + private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) { + SendExceptionContext sendExceptionContext = new SendExceptionContext(); + sendExceptionContext.setMessageId(record.getRecordId()); + sendExceptionContext.setCause(e); + if (StringUtils.isNotEmpty(record.getExtension("topic"))) { + sendExceptionContext.setTopic(record.getExtension("topic")); + } + return sendExceptionContext; + } + + private SendResult convertToSendResult(ConnectRecord record) { + SendResult result = new SendResult(); + result.setMessageId(record.getRecordId()); + if (StringUtils.isNotEmpty(record.getExtension("topic"))) { + result.setTopic(record.getExtension("topic")); + } + return result; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalFullConsumer.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalFullConsumer.java new file mode 100644 index 0000000000..939d1101aa --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalFullConsumer.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.sink.connector; + +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkFullConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.canal.DatabaseConnection; +import org.apache.eventmesh.connector.canal.SqlUtils; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.lang3.StringUtils; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +import com.alibaba.druid.pool.DruidPooledConnection; +import com.fasterxml.jackson.core.type.TypeReference; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class CanalFullConsumer { + private BlockingQueue> queue; + private RdbTableMgr tableMgr; + private CanalSinkFullConfig config; + private final DateTimeFormatter dataTimePattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"); + + + public CanalFullConsumer(BlockingQueue> queue, RdbTableMgr tableMgr, CanalSinkFullConfig config) { + this.config = config; + this.queue = queue; + this.tableMgr = tableMgr; + } + + + public void start(AtomicBoolean flag) { + while (flag.get()) { + List sinkRecords = null; + try { + sinkRecords = queue.poll(2, TimeUnit.SECONDS); + if (sinkRecords == null || sinkRecords.isEmpty()) { + continue; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + ConnectRecord record = sinkRecords.get(0); + Map dataMap = + JsonUtils.parseTypeReferenceObject((byte[]) record.getData(), new TypeReference>() { + }); + + List> rows = JsonUtils.parseObject(dataMap.get("data").toString(), List.class); + + if (rows == null || rows.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("[{}] got rows data is none", this.getClass()); + } + return; + } + CanalFullRecordOffset offset = JsonUtils.parseObject(dataMap.get("offset").toString(), CanalFullRecordOffset.class); + if (offset == null || offset.getPosition() == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] got canal full offset is none", this.getClass()); + } + return; + } + + MySQLTableDef tableDefinition = (MySQLTableDef) tableMgr.getTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); + if (tableDefinition == null) { + log.warn("target schema [{}] table [{}] is not exists", offset.getPosition().getSchema(), offset.getPosition().getTableName()); + return; + } + List cols = new ArrayList<>(tableDefinition.getColumnDefinitions().values()); + String sql = generateInsertPrepareSql(offset.getPosition().getSchema(), offset.getPosition().getTableName(), + cols); + DruidPooledConnection connection = null; + PreparedStatement statement = null; + try { + connection = DatabaseConnection.sinkDataSource.getConnection(); + statement = + connection.prepareStatement(sql); + for (Map col : rows) { + setPrepareParams(statement, col, cols); + log.debug("insert sql {}", statement.toString()); + statement.addBatch(); + } + statement.executeBatch(); + connection.commit(); + log.info("execute batch insert sql size: {}", rows.size()); + record.getCallback().onSuccess(convertToSendResult(record)); + } catch (SQLException e) { + log.warn("full sink process schema [{}] table [{}] connector write fail", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), + e); + LockSupport.parkNanos(3000 * 1000L); + record.getCallback().onException(buildSendExceptionContext(record, e)); + } catch (Exception e) { + log.error("full sink process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + record.getCallback().onException(buildSendExceptionContext(record, e)); + try { + if (connection != null && !connection.isClosed()) { + connection.rollback(); + } + } catch (SQLException rollback) { + log.warn("full sink process schema [{}] table [{}] rollback fail", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + } + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + log.error("close prepare statement fail", e); + } + } + + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + log.error("close db connection fail", e); + } + } + } + } + } + + + private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) { + SendExceptionContext sendExceptionContext = new SendExceptionContext(); + sendExceptionContext.setMessageId(record.getRecordId()); + sendExceptionContext.setCause(e); + if (StringUtils.isNotEmpty(record.getExtension("topic"))) { + sendExceptionContext.setTopic(record.getExtension("topic")); + } + return sendExceptionContext; + } + + private SendResult convertToSendResult(ConnectRecord record) { + SendResult result = new SendResult(); + result.setMessageId(record.getRecordId()); + if (StringUtils.isNotEmpty(record.getExtension("topic"))) { + result.setTopic(record.getExtension("topic")); + } + return result; + } + + private void setPrepareParams(PreparedStatement preparedStatement, Map col, List columnDefs) throws Exception { + for (int i = 0; i < columnDefs.size(); i++) { + writeColumn(preparedStatement, i + 1, columnDefs.get(i), col.get(columnDefs.get(i).getName())); + } + } + + public void writeColumn(PreparedStatement ps, int index, MySQLColumnDef colType, Object value) throws Exception { + if (colType == null) { + String colVal = null; + if (value != null) { + colVal = value.toString(); + } + if (colVal == null) { + ps.setNull(index, Types.VARCHAR); + } else { + ps.setString(index, colVal); + } + } else if (value == null) { + ps.setNull(index, colType.getJdbcType().getVendorTypeNumber()); + } else { + switch (colType.getType()) { + case TINYINT: + case SMALLINT: + case MEDIUMINT: + case INT: + Long longValue = SqlUtils.toLong(value); + if (longValue == null) { + ps.setNull(index, 4); + return; + } else { + ps.setLong(index, longValue); + return; + } + case BIGINT: + case DECIMAL: + BigDecimal bigDecimalValue = SqlUtils.toBigDecimal(value); + if (bigDecimalValue == null) { + ps.setNull(index, 3); + return; + } else { + ps.setBigDecimal(index, bigDecimalValue); + return; + } + case FLOAT: + case DOUBLE: + Double doubleValue = SqlUtils.toDouble(value); + if (doubleValue == null) { + ps.setNull(index, 8); + } else { + ps.setDouble(index, doubleValue); + } + return; + case DATE: + case DATETIME: + case TIMESTAMP: + LocalDateTime dateValue = null; + if (!SqlUtils.isZeroTime(value)) { + try { + dateValue = SqlUtils.toLocalDateTime(value); + } catch (Exception e) { + ps.setString(index, SqlUtils.convertToString(value)); + return; + } + } else if (StringUtils.isNotBlank(config.getZeroDate())) { + dateValue = SqlUtils.toLocalDateTime(config.getZeroDate()); + } else { + ps.setObject(index, value); + return; + } + if (dateValue == null) { + ps.setNull(index, Types.TIMESTAMP); + } else { + ps.setString(index, dataTimePattern.format(dateValue)); + } + return; + case TIME: + String timeValue = SqlUtils.toMySqlTime(value); + if (StringUtils.isBlank(timeValue)) { + ps.setNull(index, 12); + return; + } else { + ps.setString(index, timeValue); + return; + } + case YEAR: + LocalDateTime yearValue = null; + if (!SqlUtils.isZeroTime(value)) { + yearValue = SqlUtils.toLocalDateTime(value); + } else if (StringUtils.isNotBlank(config.getZeroDate())) { + yearValue = SqlUtils.toLocalDateTime(config.getZeroDate()); + } else { + ps.setInt(index, 0); + return; + } + if (yearValue == null) { + ps.setNull(index, 4); + } else { + ps.setInt(index, yearValue.getYear()); + } + return; + case CHAR: + case VARCHAR: + case TINYTEXT: + case TEXT: + case MEDIUMTEXT: + case LONGTEXT: + case ENUM: + case SET: + String strValue = value.toString(); + if (strValue == null) { + ps.setNull(index, Types.VARCHAR); + return; + } else { + ps.setString(index, strValue); + return; + } + case JSON: + String jsonValue = value.toString(); + if (jsonValue == null) { + ps.setNull(index, Types.VARCHAR); + } else { + ps.setString(index, jsonValue); + } + return; + case BIT: + if (value instanceof Boolean) { + byte[] arrayBoolean = new byte[1]; + arrayBoolean[0] = (byte) (Boolean.TRUE.equals(value) ? 1 : 0); + ps.setBytes(index, arrayBoolean); + return; + } else if (value instanceof Number) { + ps.setBytes(index, SqlUtils.numberToBinaryArray((Number) value)); + return; + } else if ((value instanceof byte[]) || value.toString().startsWith("0x") || value.toString().startsWith("0X")) { + byte[] arrayBoolean = SqlUtils.toBytes(value); + if (arrayBoolean == null || arrayBoolean.length == 0) { + ps.setNull(index, Types.BIT); + return; + } else { + ps.setBytes(index, arrayBoolean); + return; + } + } else { + ps.setBytes(index, SqlUtils.numberToBinaryArray(SqlUtils.toInt(value))); + return; + } + case BINARY: + case VARBINARY: + case TINYBLOB: + case BLOB: + case MEDIUMBLOB: + case LONGBLOB: + byte[] binaryValue = SqlUtils.toBytes(value); + if (binaryValue == null) { + ps.setNull(index, Types.BINARY); + return; + } else { + ps.setBytes(index, binaryValue); + return; + } + case GEOMETRY: + case GEOMETRY_COLLECTION: + case GEOM_COLLECTION: + case POINT: + case LINESTRING: + case POLYGON: + case MULTIPOINT: + case MULTILINESTRING: + case MULTIPOLYGON: + String geoValue = SqlUtils.toGeometry(value); + if (geoValue == null) { + ps.setNull(index, Types.VARCHAR); + return; + } + ps.setString(index, geoValue); + return; + default: + throw new UnsupportedOperationException("columnType '" + colType + "' Unsupported."); + } + } + } + + private String generateInsertPrepareSql(String schema, String table, List cols) { + StringBuilder builder = new StringBuilder(); + builder.append("INSERT IGNORE INTO "); + builder.append(Constants.MySQLQuot); + builder.append(schema); + builder.append(Constants.MySQLQuot); + builder.append("."); + builder.append(Constants.MySQLQuot); + builder.append(table); + builder.append(Constants.MySQLQuot); + StringBuilder columns = new StringBuilder(); + StringBuilder values = new StringBuilder(); + for (MySQLColumnDef colInfo : cols) { + if (columns.length() > 0) { + columns.append(", "); + values.append(", "); + } + String wrapName = Constants.MySQLQuot + colInfo.getName() + Constants.MySQLQuot; + columns.append(wrapName); + values.append(colInfo.getType() == null ? "?" : colInfo.getType().genPrepareStatement4Insert()); + } + builder.append("(").append(columns).append(")"); + builder.append(" VALUES "); + builder.append("(").append(values).append(")"); + return builder.toString(); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkCheckConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkCheckConnector.java index 84e01ca85c..6819c936fd 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkCheckConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkCheckConnector.java @@ -17,44 +17,38 @@ package org.apache.eventmesh.connector.canal.sink.connector; +import org.apache.eventmesh.common.EventMeshThreadFactory; import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkFullConfig; -import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; -import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef; -import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; import org.apache.eventmesh.connector.canal.DatabaseConnection; -import org.apache.eventmesh.connector.canal.SqlUtils; import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; import org.apache.eventmesh.openconnect.api.sink.Sink; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.util.ConfigUtil; -import org.apache.commons.lang3.StringUtils; - -import java.math.BigDecimal; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Types; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.LockSupport; - -import com.alibaba.druid.pool.DruidPooledConnection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @Slf4j public class CanalSinkCheckConnector implements Sink, ConnectorCreateService { + private CanalSinkFullConfig config; private RdbTableMgr tableMgr; - private final DateTimeFormatter dataTimePattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"); + private ThreadPoolExecutor executor; + private final BlockingQueue> queue = new LinkedBlockingQueue<>(10000); + private final AtomicBoolean flag = new AtomicBoolean(true); @Override public void start() throws Exception { @@ -63,7 +57,23 @@ public void start() throws Exception { @Override public void stop() throws Exception { - + flag.set(false); + if (!executor.isShutdown()) { + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + log.warn("wait thread pool shutdown timeout, it will shutdown now"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("shutdown thread pool fail"); + } + } + if (DatabaseConnection.sinkDataSource != null) { + DatabaseConnection.sinkDataSource.close(); + log.info("data source has been closed"); + } } @Override @@ -84,7 +94,8 @@ public void init(Config config) throws Exception { @Override public void init(ConnectorContext connectorContext) throws Exception { - this.config = (CanalSinkFullConfig) ((SinkConnectorContext) connectorContext).getSinkConfig(); + CanalSinkConfig canalSinkConfig = (CanalSinkConfig) ((SinkConnectorContext) connectorContext).getSinkConfig(); + this.config = ConfigUtil.parse(canalSinkConfig.getSinkConfig(), CanalSinkFullConfig.class); init(); } @@ -97,6 +108,14 @@ private void init() { DatabaseConnection.sinkDataSource.setDefaultAutoCommit(false); tableMgr = new RdbTableMgr(this.config.getSinkConnectorConfig(), DatabaseConnection.sinkDataSource); + executor = new ThreadPoolExecutor(config.getParallel(), config.getParallel(), 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new EventMeshThreadFactory("canal-sink-check")); + List consumers = new LinkedList<>(); + for (int i = 0; i < config.getParallel(); i++) { + CanalCheckConsumer canalCheckConsumer = new CanalCheckConsumer(queue, tableMgr, config); + consumers.add(canalCheckConsumer); + } + consumers.forEach(c -> executor.execute(() -> c.start(flag))); } @Override @@ -122,285 +141,11 @@ public void put(List sinkRecords) { } return; } - ConnectRecord record = sinkRecords.get(0); - List> data = (List>) record.getData(); - if (data == null || data.isEmpty()) { - if (log.isDebugEnabled()) { - log.debug("[{}] got rows data is none", this.getClass()); - } - return; - } - CanalFullRecordOffset offset = (CanalFullRecordOffset) record.getPosition().getRecordOffset(); - if (offset == null || offset.getPosition() == null) { - if (log.isDebugEnabled()) { - log.debug("[{}] got canal full offset is none", this.getClass()); - } - return; - } - - MySQLTableDef tableDefinition = (MySQLTableDef) tableMgr.getTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); - if (tableDefinition == null) { - log.warn("target schema [{}] table [{}] is not exists", offset.getPosition().getSchema(), offset.getPosition().getTableName()); - return; - } - List cols = new ArrayList<>(tableDefinition.getColumnDefinitions().values()); - String sql = generateInsertPrepareSql(offset.getPosition().getSchema(), offset.getPosition().getTableName(), - cols); - DruidPooledConnection connection = null; - PreparedStatement statement = null; try { - connection = DatabaseConnection.sinkDataSource.getConnection(); - statement = - connection.prepareStatement(sql); - for (Map col : data) { - setPrepareParams(statement, col, cols); - log.info("insert sql {}", statement.toString()); - statement.addBatch(); - } - statement.executeBatch(); - connection.commit(); - } catch (SQLException e) { - log.warn("full sink process schema [{}] table [{}] connector write fail", tableDefinition.getSchemaName(), tableDefinition.getTableName(), - e); - LockSupport.parkNanos(3000 * 1000L); - } catch (Exception e) { - log.error("full sink process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), - tableDefinition.getTableName(), e); - try { - if (connection != null && !connection.isClosed()) { - connection.rollback(); - } - } catch (SQLException rollback) { - log.warn("full sink process schema [{}] table [{}] rollback fail", tableDefinition.getSchemaName(), - tableDefinition.getTableName(), e); - } - } finally { - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - log.info("close prepare statement fail", e); - } - } - - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - log.info("close db connection fail", e); - } - } - } - } - - private void setPrepareParams(PreparedStatement preparedStatement, Map col, List columnDefs) throws Exception { - for (int i = 0; i < columnDefs.size(); i++) { - writeColumn(preparedStatement, i + 1, columnDefs.get(i), col.get(columnDefs.get(i).getName())); + queue.put(sinkRecords); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } - public void writeColumn(PreparedStatement ps, int index, MySQLColumnDef colType, Object value) throws Exception { - if (colType == null) { - String colVal = null; - if (value != null) { - colVal = value.toString(); - } - if (colVal == null) { - ps.setNull(index, Types.VARCHAR); - } else { - ps.setString(index, colVal); - } - } else if (value == null) { - ps.setNull(index, colType.getJdbcType().getVendorTypeNumber()); - } else { - switch (colType.getType()) { - case TINYINT: - case SMALLINT: - case MEDIUMINT: - case INT: - Long longValue = SqlUtils.toLong(value); - if (longValue == null) { - ps.setNull(index, 4); - return; - } else { - ps.setLong(index, longValue); - return; - } - case BIGINT: - case DECIMAL: - BigDecimal bigDecimalValue = SqlUtils.toBigDecimal(value); - if (bigDecimalValue == null) { - ps.setNull(index, 3); - return; - } else { - ps.setBigDecimal(index, bigDecimalValue); - return; - } - case FLOAT: - case DOUBLE: - Double doubleValue = SqlUtils.toDouble(value); - if (doubleValue == null) { - ps.setNull(index, 8); - } else { - ps.setDouble(index, doubleValue); - } - return; - case DATE: - case DATETIME: - case TIMESTAMP: - LocalDateTime dateValue = null; - if (!SqlUtils.isZeroTime(value)) { - try { - dateValue = SqlUtils.toLocalDateTime(value); - } catch (Exception e) { - ps.setString(index, SqlUtils.convertToString(value)); - return; - } - } else if (StringUtils.isNotBlank(config.getZeroDate())) { - dateValue = SqlUtils.toLocalDateTime(config.getZeroDate()); - } else { - ps.setObject(index, value); - return; - } - if (dateValue == null) { - ps.setNull(index, Types.TIMESTAMP); - } else { - ps.setString(index, dataTimePattern.format(dateValue)); - } - return; - case TIME: - String timeValue = SqlUtils.toMySqlTime(value); - if (StringUtils.isBlank(timeValue)) { - ps.setNull(index, 12); - return; - } else { - ps.setString(index, timeValue); - return; - } - case YEAR: - LocalDateTime yearValue = null; - if (!SqlUtils.isZeroTime(value)) { - yearValue = SqlUtils.toLocalDateTime(value); - } else if (StringUtils.isNotBlank(config.getZeroDate())) { - yearValue = SqlUtils.toLocalDateTime(config.getZeroDate()); - } else { - ps.setInt(index, 0); - return; - } - if (yearValue == null) { - ps.setNull(index, 4); - } else { - ps.setInt(index, yearValue.getYear()); - } - return; - case CHAR: - case VARCHAR: - case TINYTEXT: - case TEXT: - case MEDIUMTEXT: - case LONGTEXT: - case ENUM: - case SET: - String strValue = value.toString(); - if (strValue == null) { - ps.setNull(index, Types.VARCHAR); - return; - } else { - ps.setString(index, strValue); - return; - } - case JSON: - String jsonValue = value.toString(); - if (jsonValue == null) { - ps.setNull(index, Types.VARCHAR); - } else { - ps.setString(index, jsonValue); - } - return; - case BIT: - if (value instanceof Boolean) { - byte[] arrayBoolean = new byte[1]; - arrayBoolean[0] = (byte) (Boolean.TRUE.equals(value) ? 1 : 0); - ps.setBytes(index, arrayBoolean); - return; - } else if (value instanceof Number) { - ps.setBytes(index, SqlUtils.numberToBinaryArray((Number) value)); - return; - } else if ((value instanceof byte[]) || value.toString().startsWith("0x") || value.toString().startsWith("0X")) { - byte[] arrayBoolean = SqlUtils.toBytes(value); - if (arrayBoolean == null || arrayBoolean.length == 0) { - ps.setNull(index, Types.BIT); - return; - } else { - ps.setBytes(index, arrayBoolean); - return; - } - } else { - ps.setBytes(index, SqlUtils.numberToBinaryArray(SqlUtils.toInt(value))); - return; - } - case BINARY: - case VARBINARY: - case TINYBLOB: - case BLOB: - case MEDIUMBLOB: - case LONGBLOB: - byte[] binaryValue = SqlUtils.toBytes(value); - if (binaryValue == null) { - ps.setNull(index, Types.BINARY); - return; - } else { - ps.setBytes(index, binaryValue); - return; - } - case GEOMETRY: - case GEOMETRY_COLLECTION: - case GEOM_COLLECTION: - case POINT: - case LINESTRING: - case POLYGON: - case MULTIPOINT: - case MULTILINESTRING: - case MULTIPOLYGON: - String geoValue = SqlUtils.toGeometry(value); - if (geoValue == null) { - ps.setNull(index, Types.VARCHAR); - return; - } - ps.setString(index, geoValue); - return; - default: - throw new UnsupportedOperationException("columnType '" + colType + "' Unsupported."); - } - } - } - - private String generateInsertPrepareSql(String schema, String table, List cols) { - StringBuilder builder = new StringBuilder(); - builder.append("INSERT IGNORE INTO "); - builder.append(Constants.MySQLQuot); - builder.append(schema); - builder.append(Constants.MySQLQuot); - builder.append("."); - builder.append(Constants.MySQLQuot); - builder.append(table); - builder.append(Constants.MySQLQuot); - StringBuilder columns = new StringBuilder(); - StringBuilder values = new StringBuilder(); - for (MySQLColumnDef colInfo : cols) { - if (columns.length() > 0) { - columns.append(", "); - values.append(", "); - } - String wrapName = Constants.MySQLQuot + colInfo.getName() + Constants.MySQLQuot; - columns.append(wrapName); - values.append(colInfo.getType() == null ? "?" : colInfo.getType().genPrepareStatement4Insert()); - } - builder.append("(").append(columns).append(")"); - builder.append(" VALUES "); - builder.append("(").append(values).append(")"); - return builder.toString(); - } - - } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java index 4137123922..cb50dc5648 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java @@ -17,42 +17,27 @@ package org.apache.eventmesh.connector.canal.sink.connector; +import org.apache.eventmesh.common.EventMeshThreadFactory; import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkFullConfig; -import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; -import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef; -import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.DatabaseConnection; -import org.apache.eventmesh.connector.canal.SqlUtils; import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; import org.apache.eventmesh.openconnect.api.sink.Sink; -import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; -import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.util.ConfigUtil; -import org.apache.commons.lang3.StringUtils; - -import java.math.BigDecimal; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Types; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.LockSupport; - -import com.alibaba.druid.pool.DruidPooledConnection; -import com.fasterxml.jackson.core.type.TypeReference; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @@ -61,7 +46,9 @@ public class CanalSinkFullConnector implements Sink, ConnectorCreateService> queue = new LinkedBlockingQueue<>(10000); + private final AtomicBoolean flag = new AtomicBoolean(true); @Override public void start() throws Exception { @@ -70,7 +57,23 @@ public void start() throws Exception { @Override public void stop() throws Exception { - + flag.set(false); + if (!executor.isShutdown()) { + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + log.warn("wait thread pool shutdown timeout, it will shutdown now"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("shutdown thread pool fail"); + } + } + if (DatabaseConnection.sinkDataSource != null) { + DatabaseConnection.sinkDataSource.close(); + log.info("data source has been closed"); + } } @Override @@ -106,6 +109,14 @@ private void init() { DatabaseConnection.sinkDataSource.setDefaultAutoCommit(false); tableMgr = new RdbTableMgr(this.config.getSinkConnectorConfig(), DatabaseConnection.sinkDataSource); + executor = new ThreadPoolExecutor(config.getParallel(), config.getParallel(), 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new EventMeshThreadFactory("canal-sink-full")); + List consumers = new LinkedList<>(); + for (int i = 0; i < config.getParallel(); i++) { + CanalFullConsumer canalFullConsumer = new CanalFullConsumer(queue, tableMgr, config); + consumers.add(canalFullConsumer); + } + consumers.forEach(c -> executor.execute(() -> c.start(flag))); } @Override @@ -131,309 +142,12 @@ public void put(List sinkRecords) { } return; } - ConnectRecord record = sinkRecords.get(0); - List> data = - JsonUtils.parseTypeReferenceObject((byte[]) record.getData(), new TypeReference>>() { - }); - if (data == null || data.isEmpty()) { - if (log.isDebugEnabled()) { - log.debug("[{}] got rows data is none", this.getClass()); - } - return; - } - CanalFullRecordOffset offset = (CanalFullRecordOffset) record.getPosition().getRecordOffset(); - if (offset == null || offset.getPosition() == null) { - if (log.isDebugEnabled()) { - log.debug("[{}] got canal full offset is none", this.getClass()); - } - return; - } - - MySQLTableDef tableDefinition = (MySQLTableDef) tableMgr.getTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); - if (tableDefinition == null) { - log.warn("target schema [{}] table [{}] is not exists", offset.getPosition().getSchema(), offset.getPosition().getTableName()); - return; - } - List cols = new ArrayList<>(tableDefinition.getColumnDefinitions().values()); - String sql = generateInsertPrepareSql(offset.getPosition().getSchema(), offset.getPosition().getTableName(), - cols); - DruidPooledConnection connection = null; - PreparedStatement statement = null; try { - connection = DatabaseConnection.sinkDataSource.getConnection(); - statement = - connection.prepareStatement(sql); - for (Map col : data) { - setPrepareParams(statement, col, cols); - log.info("insert sql {}", statement.toString()); - statement.addBatch(); - } - statement.executeBatch(); - connection.commit(); - record.getCallback().onSuccess(convertToSendResult(record)); - } catch (SQLException e) { - log.warn("full sink process schema [{}] table [{}] connector write fail", tableDefinition.getSchemaName(), tableDefinition.getTableName(), - e); - LockSupport.parkNanos(3000 * 1000L); - record.getCallback().onException(buildSendExceptionContext(record, e)); - } catch (Exception e) { - log.error("full sink process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), - tableDefinition.getTableName(), e); - record.getCallback().onException(buildSendExceptionContext(record, e)); - try { - if (connection != null && !connection.isClosed()) { - connection.rollback(); - } - } catch (SQLException rollback) { - log.warn("full sink process schema [{}] table [{}] rollback fail", tableDefinition.getSchemaName(), - tableDefinition.getTableName(), e); - } - } finally { - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - log.info("close prepare statement fail", e); - } - } - - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - log.info("close db connection fail", e); - } - } - } - } - - private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) { - SendExceptionContext sendExceptionContext = new SendExceptionContext(); - sendExceptionContext.setMessageId(record.getRecordId()); - sendExceptionContext.setCause(e); - if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { - sendExceptionContext.setTopic(record.getExtension("topic")); - } - return sendExceptionContext; - } - - private SendResult convertToSendResult(ConnectRecord record) { - SendResult result = new SendResult(); - result.setMessageId(record.getRecordId()); - if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { - result.setTopic(record.getExtension("topic")); - } - return result; - } - - private void setPrepareParams(PreparedStatement preparedStatement, Map col, List columnDefs) throws Exception { - for (int i = 0; i < columnDefs.size(); i++) { - writeColumn(preparedStatement, i + 1, columnDefs.get(i), col.get(columnDefs.get(i).getName())); - } - } - - public void writeColumn(PreparedStatement ps, int index, MySQLColumnDef colType, Object value) throws Exception { - if (colType == null) { - String colVal = null; - if (value != null) { - colVal = value.toString(); - } - if (colVal == null) { - ps.setNull(index, Types.VARCHAR); - } else { - ps.setString(index, colVal); - } - } else if (value == null) { - ps.setNull(index, colType.getJdbcType().getVendorTypeNumber()); - } else { - switch (colType.getType()) { - case TINYINT: - case SMALLINT: - case MEDIUMINT: - case INT: - Long longValue = SqlUtils.toLong(value); - if (longValue == null) { - ps.setNull(index, 4); - return; - } else { - ps.setLong(index, longValue); - return; - } - case BIGINT: - case DECIMAL: - BigDecimal bigDecimalValue = SqlUtils.toBigDecimal(value); - if (bigDecimalValue == null) { - ps.setNull(index, 3); - return; - } else { - ps.setBigDecimal(index, bigDecimalValue); - return; - } - case FLOAT: - case DOUBLE: - Double doubleValue = SqlUtils.toDouble(value); - if (doubleValue == null) { - ps.setNull(index, 8); - } else { - ps.setDouble(index, doubleValue); - } - return; - case DATE: - case DATETIME: - case TIMESTAMP: - LocalDateTime dateValue = null; - if (!SqlUtils.isZeroTime(value)) { - try { - dateValue = SqlUtils.toLocalDateTime(value); - } catch (Exception e) { - ps.setString(index, SqlUtils.convertToString(value)); - return; - } - } else if (StringUtils.isNotBlank(config.getZeroDate())) { - dateValue = SqlUtils.toLocalDateTime(config.getZeroDate()); - } else { - ps.setObject(index, value); - return; - } - if (dateValue == null) { - ps.setNull(index, Types.TIMESTAMP); - } else { - ps.setString(index, dataTimePattern.format(dateValue)); - } - return; - case TIME: - String timeValue = SqlUtils.toMySqlTime(value); - if (StringUtils.isBlank(timeValue)) { - ps.setNull(index, 12); - return; - } else { - ps.setString(index, timeValue); - return; - } - case YEAR: - LocalDateTime yearValue = null; - if (!SqlUtils.isZeroTime(value)) { - yearValue = SqlUtils.toLocalDateTime(value); - } else if (StringUtils.isNotBlank(config.getZeroDate())) { - yearValue = SqlUtils.toLocalDateTime(config.getZeroDate()); - } else { - ps.setInt(index, 0); - return; - } - if (yearValue == null) { - ps.setNull(index, 4); - } else { - ps.setInt(index, yearValue.getYear()); - } - return; - case CHAR: - case VARCHAR: - case TINYTEXT: - case TEXT: - case MEDIUMTEXT: - case LONGTEXT: - case ENUM: - case SET: - String strValue = value.toString(); - if (strValue == null) { - ps.setNull(index, Types.VARCHAR); - return; - } else { - ps.setString(index, strValue); - return; - } - case JSON: - String jsonValue = value.toString(); - if (jsonValue == null) { - ps.setNull(index, Types.VARCHAR); - } else { - ps.setString(index, jsonValue); - } - return; - case BIT: - if (value instanceof Boolean) { - byte[] arrayBoolean = new byte[1]; - arrayBoolean[0] = (byte) (Boolean.TRUE.equals(value) ? 1 : 0); - ps.setBytes(index, arrayBoolean); - return; - } else if (value instanceof Number) { - ps.setBytes(index, SqlUtils.numberToBinaryArray((Number) value)); - return; - } else if ((value instanceof byte[]) || value.toString().startsWith("0x") || value.toString().startsWith("0X")) { - byte[] arrayBoolean = SqlUtils.toBytes(value); - if (arrayBoolean == null || arrayBoolean.length == 0) { - ps.setNull(index, Types.BIT); - return; - } else { - ps.setBytes(index, arrayBoolean); - return; - } - } else { - ps.setBytes(index, SqlUtils.numberToBinaryArray(SqlUtils.toInt(value))); - return; - } - case BINARY: - case VARBINARY: - case TINYBLOB: - case BLOB: - case MEDIUMBLOB: - case LONGBLOB: - byte[] binaryValue = SqlUtils.toBytes(value); - if (binaryValue == null) { - ps.setNull(index, Types.BINARY); - return; - } else { - ps.setBytes(index, binaryValue); - return; - } - case GEOMETRY: - case GEOMETRY_COLLECTION: - case GEOM_COLLECTION: - case POINT: - case LINESTRING: - case POLYGON: - case MULTIPOINT: - case MULTILINESTRING: - case MULTIPOLYGON: - String geoValue = SqlUtils.toGeometry(value); - if (geoValue == null) { - ps.setNull(index, Types.VARCHAR); - return; - } - ps.setString(index, geoValue); - return; - default: - throw new UnsupportedOperationException("columnType '" + colType + "' Unsupported."); - } + queue.put(sinkRecords); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - } - private String generateInsertPrepareSql(String schema, String table, List cols) { - StringBuilder builder = new StringBuilder(); - builder.append("INSERT IGNORE INTO "); - builder.append(Constants.MySQLQuot); - builder.append(schema); - builder.append(Constants.MySQLQuot); - builder.append("."); - builder.append(Constants.MySQLQuot); - builder.append(table); - builder.append(Constants.MySQLQuot); - StringBuilder columns = new StringBuilder(); - StringBuilder values = new StringBuilder(); - for (MySQLColumnDef colInfo : cols) { - if (columns.length() > 0) { - columns.append(", "); - values.append(", "); - } - String wrapName = Constants.MySQLQuot + colInfo.getName() + Constants.MySQLQuot; - columns.append(wrapName); - values.append(colInfo.getType() == null ? "?" : colInfo.getType().genPrepareStatement4Insert()); - } - builder.append("(").append(columns).append(")"); - builder.append(" VALUES "); - builder.append("(").append(values).append(")"); - return builder.toString(); } - } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkIncrementConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkIncrementConnector.java index e165a5ffe6..84373ae7a7 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkIncrementConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkIncrementConnector.java @@ -680,7 +680,7 @@ public int getBatchSize() { } catch (Exception e) { // rollback status.setRollbackOnly(); - throw new RuntimeException("Failed to execute batch with GTID", e); + throw new RuntimeException("Failed to execute batch ", e); } finally { lobCreator.close(); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 5a6ceb7c3f..d7388c628b 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -69,6 +69,7 @@ public static Map> parse(CanalSourceIncrementConf // if not gtid mode, need check weather the entry is loopback by specified column value needSync = checkNeedSync(sourceConfig, rowChange); if (needSync) { + log.debug("entry evenType {}|rowChange {}", rowChange.getEventType(), rowChange); transactionDataBuffer.add(entry); } } @@ -76,14 +77,27 @@ public static Map> parse(CanalSourceIncrementConf case TRANSACTIONEND: parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer, tables); if (!recordList.isEmpty()) { - recordMap.put(entry.getHeader().getLogfileOffset(), recordList); + List transactionEndList = new ArrayList<>(recordList); + recordMap.put(entry.getHeader().getLogfileOffset(), transactionEndList); } + recordList.clear(); transactionDataBuffer.clear(); break; default: break; } } + + // add last data in transactionDataBuffer, in case no TRANSACTIONEND + parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer, tables); + if (!recordList.isEmpty()) { + List transactionEndList = new ArrayList<>(recordList); + CanalConnectRecord lastCanalConnectRecord = transactionEndList.get(transactionEndList.size() - 1); + recordMap.put(lastCanalConnectRecord.getBinLogOffset(), transactionEndList); + } + recordList.clear(); + transactionDataBuffer.clear(); + } catch (Exception e) { throw new RuntimeException(e); } @@ -118,6 +132,9 @@ private static void parseRecordListWithEntryBuffer(CanalSourceIncrementConfig so private static boolean checkNeedSync(CanalSourceIncrementConfig sourceConfig, RowChange rowChange) { Column markedColumn = null; CanalEntry.EventType eventType = rowChange.getEventType(); + if (StringUtils.isEmpty(sourceConfig.getNeedSyncMarkTableColumnName())) { + return true; + } if (eventType.equals(CanalEntry.EventType.DELETE)) { markedColumn = getColumnIgnoreCase(rowChange.getRowDatas(0).getBeforeColumnsList(), sourceConfig.getNeedSyncMarkTableColumnName()); diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalFullProducer.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalFullProducer.java index c0b2063d28..644b77247d 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalFullProducer.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalFullProducer.java @@ -32,6 +32,7 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import java.math.BigDecimal; +import java.math.RoundingMode; import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.PreparedStatement; @@ -45,17 +46,22 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import javax.sql.DataSource; +import com.google.common.util.concurrent.RateLimiter; + +import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -64,26 +70,34 @@ public class CanalFullProducer { private BlockingQueue> queue; private final DataSource dataSource; private final MySQLTableDef tableDefinition; - private final TableFullPosition position; + private final TableFullPosition tableFullPosition; + private final JobRdbFullPosition startPosition; private static final int LIMIT = 2048; private final int flushSize; private final AtomicReference choosePrimaryKey = new AtomicReference<>(null); private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); private static final DateTimeFormatter DATE_STAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private AtomicLong scanCount = new AtomicLong(0); + private final RateLimiter pageLimiter; + @Setter + private RateLimiter recordLimiter; public CanalFullProducer(BlockingQueue> queue, DataSource dataSource, - MySQLTableDef tableDefinition, TableFullPosition position, int flushSize) { + MySQLTableDef tableDefinition, JobRdbFullPosition startPosition, int flushSize, int pagePerSecond) { this.queue = queue; this.dataSource = dataSource; this.tableDefinition = tableDefinition; - this.position = position; + this.startPosition = startPosition; + this.tableFullPosition = JsonUtils.parseObject(startPosition.getPrimaryKeyRecords(), TableFullPosition.class); + this.scanCount.set(startPosition.getHandledRecordCount()); this.flushSize = flushSize; + this.pageLimiter = RateLimiter.create(pagePerSecond); } public void choosePrimaryKey() { for (RdbColumnDefinition col : tableDefinition.getColumnDefinitions().values()) { - if (position.getCurPrimaryKeyCols().get(col.getName()) != null) { + if (tableFullPosition.getCurPrimaryKeyCols().get(col.getName()) != null) { // random choose the first primary key from the table choosePrimaryKey.set(col.getName()); log.info("schema [{}] table [{}] choose primary key [{}]", tableDefinition.getSchemaName(), tableDefinition.getTableName(), @@ -101,8 +115,11 @@ public void start(AtomicBoolean flag) { boolean isFirstSelect = true; List> rows = new LinkedList<>(); while (flag.get()) { + // acquire a permit before each database read + pageLimiter.acquire(); + String scanSql = generateScanSql(isFirstSelect); - log.info("scan sql is [{}] , cur position [{}]", scanSql, JsonUtils.toJSONString(position.getCurPrimaryKeyCols())); + log.info("scan sql is [{}] , cur position [{}]", scanSql, JsonUtils.toJSONString(tableFullPosition.getCurPrimaryKeyCols())); try (Connection connection = dataSource.getConnection(); PreparedStatement statement = connection.prepareStatement(scanSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { @@ -119,12 +136,13 @@ public void start(AtomicBoolean flag) { } lastCol = columnValues; rows.add(lastCol); + this.scanCount.incrementAndGet(); if (rows.size() < flushSize) { continue; } refreshPosition(lastCol); // may be not reach - commitConnectRecord(rows); + commitConnectRecord(rows, false, this.scanCount.get(), startPosition); rows = new LinkedList<>(); } @@ -132,7 +150,7 @@ public void start(AtomicBoolean flag) { log.info("full scan db [{}] table [{}] finish", tableDefinition.getSchemaName(), tableDefinition.getTableName()); // commit the last record if rows.size() < flushSize - commitConnectRecord(rows); + commitConnectRecord(rows, true, this.scanCount.get(), startPosition); return; } refreshPosition(lastCol); @@ -157,26 +175,44 @@ public void start(AtomicBoolean flag) { } } - private void commitConnectRecord(List> rows) throws InterruptedException { + private void commitConnectRecord(List> rows, boolean isFinished, long migratedCount, JobRdbFullPosition position) + throws InterruptedException { if (rows == null || rows.isEmpty()) { return; } JobRdbFullPosition jobRdbFullPosition = new JobRdbFullPosition(); - jobRdbFullPosition.setPrimaryKeyRecords(JsonUtils.toJSONString(position)); + jobRdbFullPosition.setPrimaryKeyRecords(JsonUtils.toJSONString(tableFullPosition)); jobRdbFullPosition.setTableName(tableDefinition.getTableName()); jobRdbFullPosition.setSchema(tableDefinition.getSchemaName()); + jobRdbFullPosition.setFinished(isFinished); + jobRdbFullPosition.setHandledRecordCount(migratedCount); + jobRdbFullPosition.setMaxCount(position.getMaxCount()); + if (isFinished) { + jobRdbFullPosition.setPercent(new BigDecimal("100")); + } else { + double num = 100.0d * ((double) migratedCount) * 1.0d / (double) position.getMaxCount(); + String number = Double.toString(num); + BigDecimal percent = new BigDecimal(number).setScale(2, RoundingMode.HALF_UP); + jobRdbFullPosition.setPercent(percent); + } CanalFullRecordOffset offset = new CanalFullRecordOffset(); offset.setPosition(jobRdbFullPosition); CanalFullRecordPartition partition = new CanalFullRecordPartition(); + Map dataMap = new HashMap<>(); + dataMap.put("data", JsonUtils.toJSONString(rows)); + dataMap.put("partition", JsonUtils.toJSONString(partition)); + dataMap.put("offset", JsonUtils.toJSONString(offset)); ArrayList records = new ArrayList<>(); - byte[] rowsData = JsonUtils.toJSONString(rows).getBytes(StandardCharsets.UTF_8); - records.add(new ConnectRecord(partition, offset, System.currentTimeMillis(), rowsData)); + records.add( + new ConnectRecord(partition, offset, System.currentTimeMillis(), JsonUtils.toJSONString(dataMap).getBytes(StandardCharsets.UTF_8))); + // global limiter, 100 records per second default + recordLimiter.acquire(); queue.put(records); } private boolean checkIsScanFinish(Map lastCol) { Object lastPrimaryValue = lastCol.get(choosePrimaryKey.get()); - Object maxPrimaryValue = position.getMaxPrimaryKeyCols().get(choosePrimaryKey.get()); + Object maxPrimaryValue = tableFullPosition.getMaxPrimaryKeyCols().get(choosePrimaryKey.get()); if (lastPrimaryValue instanceof Number) { BigDecimal last = new BigDecimal(String.valueOf(lastPrimaryValue)); BigDecimal max = @@ -189,22 +225,22 @@ private boolean checkIsScanFinish(Map lastCol) { return false; } - public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throws Exception { - if (col == null || rs.wasNull()) { - return null; - } + public Object readColumn(ResultSet rs, String colName, CanalMySQLType colType) throws Exception { switch (colType) { case TINYINT: case SMALLINT: case MEDIUMINT: case INT: - Long valueLong = rs.getLong(col); + Long valueLong = rs.getLong(colName); + if (rs.wasNull()) { + return null; + } if (valueLong.compareTo((long) Integer.MAX_VALUE) > 0) { return valueLong; } return valueLong.intValue(); case BIGINT: - String v = rs.getString(col); + String v = rs.getString(colName); if (v == null) { return null; } @@ -216,16 +252,20 @@ public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throw case FLOAT: case DOUBLE: case DECIMAL: - return rs.getBigDecimal(col); + return rs.getBigDecimal(colName); case DATE: - return rs.getObject(col, LocalDate.class); + return rs.getObject(colName, LocalDate.class); case TIME: - return rs.getObject(col, LocalTime.class); + return rs.getObject(colName, LocalTime.class); case DATETIME: case TIMESTAMP: - return rs.getObject(col, LocalDateTime.class); + return rs.getObject(colName, LocalDateTime.class); case YEAR: - return rs.getInt(col); + int year = rs.getInt(colName); + if (rs.wasNull()) { + return null; + } + return year; case CHAR: case VARCHAR: case TINYTEXT: @@ -235,7 +275,7 @@ public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throw case ENUM: case SET: case JSON: - return rs.getString(col); + return rs.getString(colName); case BIT: case BINARY: case VARBINARY: @@ -243,7 +283,7 @@ public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throw case BLOB: case MEDIUMBLOB: case LONGBLOB: - return rs.getBytes(col); + return rs.getBytes(colName); case GEOMETRY: case GEOMETRY_COLLECTION: case GEOM_COLLECTION: @@ -253,23 +293,23 @@ public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throw case MULTIPOINT: case MULTILINESTRING: case MULTIPOLYGON: - byte[] geo = rs.getBytes(col); + byte[] geo = rs.getBytes(colName); if (geo == null) { return null; } return SqlUtils.toGeometry(geo); default: - return rs.getObject(col); + return rs.getObject(colName); } } private void refreshPosition(Map lastCol) { Map nextPosition = new LinkedHashMap<>(); - for (Map.Entry entry : position.getCurPrimaryKeyCols().entrySet()) { + for (Map.Entry entry : tableFullPosition.getCurPrimaryKeyCols().entrySet()) { nextPosition.put(entry.getKey(), lastCol.get(entry.getKey())); } - position.setCurPrimaryKeyCols(nextPosition); + tableFullPosition.setCurPrimaryKeyCols(nextPosition); } private void setPrepareStatementValue(PreparedStatement statement) throws SQLException { @@ -278,7 +318,7 @@ private void setPrepareStatementValue(PreparedStatement statement) throws SQLExc return; } RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(colName); - Object value = position.getCurPrimaryKeyCols().get(colName); + Object value = tableFullPosition.getCurPrimaryKeyCols().get(colName); String str; switch (columnDefinition.getJdbcType()) { case BIT: diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java index bd85f03240..4d3e569dcd 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java @@ -20,16 +20,15 @@ import org.apache.eventmesh.common.AbstractComponent; import org.apache.eventmesh.common.EventMeshThreadFactory; import org.apache.eventmesh.common.config.connector.Config; -import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceCheckConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.DatabaseConnection; -import org.apache.eventmesh.connector.canal.source.position.CanalFullPositionMgr; -import org.apache.eventmesh.connector.canal.source.position.TableFullPosition; +import org.apache.eventmesh.connector.canal.source.position.CanalCheckPositionMgr; import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable; import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; @@ -37,67 +36,86 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.util.ConfigUtil; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.RateLimiter; + import lombok.extern.slf4j.Slf4j; @Slf4j public class CanalSourceCheckConnector extends AbstractComponent implements Source, ConnectorCreateService { - private CanalSourceFullConfig config; - private CanalFullPositionMgr positionMgr; + private CanalSourceCheckConfig config; + private CanalCheckPositionMgr positionMgr; private RdbTableMgr tableMgr; private ThreadPoolExecutor executor; - private BlockingQueue> queue; + private final ScheduledExecutorService scheduledThreadPoolExecutor = Executors.newSingleThreadScheduledExecutor(); + private final BlockingQueue> queue = new LinkedBlockingQueue<>(10000); private final AtomicBoolean flag = new AtomicBoolean(true); - private long maxPollWaitTime; + private RateLimiter globalLimiter; @Override protected void run() throws Exception { - this.tableMgr.start(); - this.positionMgr.start(); - if (positionMgr.isFinished()) { - log.info("connector [{}] has finished the job", config.getSourceConnectorConfig().getConnectorName()); - return; - } - executor = new ThreadPoolExecutor(config.getParallel(), config.getParallel(), 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), new EventMeshThreadFactory("canal-source-full")); - List producers = new LinkedList<>(); - if (config.getSourceConnectorConfig().getDatabases() != null) { - for (RdbDBDefinition db : config.getSourceConnectorConfig().getDatabases()) { - for (RdbTableDefinition table : db.getTables()) { - try { - log.info("it will create producer of db [{}] table [{}]", db.getSchemaName(), table.getTableName()); - RdbSimpleTable simpleTable = new RdbSimpleTable(db.getSchemaName(), table.getTableName()); - JobRdbFullPosition position = positionMgr.getPosition(simpleTable); - if (position == null) { - throw new EventMeshException(String.format("db [%s] table [%s] have none position info", - db.getSchemaName(), table.getTableName())); - } - RdbTableDefinition tableDefinition = tableMgr.getTable(simpleTable); - if (tableDefinition == null) { - throw new EventMeshException(String.format("db [%s] table [%s] have none table definition info", - db.getSchemaName(), table.getTableName())); + scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> { + try { + this.tableMgr.start(); + } catch (Exception e) { + log.error("start tableMgr fail", e); + throw new RuntimeException(e); + } + try { + this.positionMgr.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + // if (positionMgr.isFinished()) { + // log.info("connector [{}] has finished the job", config.getSourceConnectorConfig().getConnectorName()); + // return; + // } + executor = new ThreadPoolExecutor(config.getParallel(), config.getParallel(), 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new EventMeshThreadFactory("canal-source-check")); + List producers = new LinkedList<>(); + if (config.getSourceConnectorConfig().getDatabases() != null) { + for (RdbDBDefinition db : config.getSourceConnectorConfig().getDatabases()) { + for (RdbTableDefinition table : db.getTables()) { + try { + log.info("it will create producer of db [{}] table [{}]", db.getSchemaName(), table.getTableName()); + RdbSimpleTable simpleTable = new RdbSimpleTable(db.getSchemaName(), table.getTableName()); + JobRdbFullPosition position = positionMgr.getPosition(simpleTable); + if (position == null) { + throw new EventMeshException(String.format("db [%s] table [%s] have none position info", + db.getSchemaName(), table.getTableName())); + } + RdbTableDefinition tableDefinition = tableMgr.getTable(simpleTable); + if (tableDefinition == null) { + throw new EventMeshException(String.format("db [%s] table [%s] have none table definition info", + db.getSchemaName(), table.getTableName())); + } + CanalFullProducer producer = + new CanalFullProducer(queue, DatabaseConnection.sourceDataSource, (MySQLTableDef) tableDefinition, + position, config.getFlushSize(), config.getPagePerSecond()); + producer.setRecordLimiter(globalLimiter); + producers.add(producer); + } catch (Exception e) { + log.error("create schema [{}] table [{}] producers fail", db.getSchemaName(), + table.getTableName(), e); } - - producers.add(new CanalFullProducer(queue, DatabaseConnection.sourceDataSource, (MySQLTableDef) tableDefinition, - JsonUtils.parseObject(position.getPrimaryKeyRecords(), TableFullPosition.class), - config.getFlushSize())); - } catch (Exception e) { - log.error("create schema [{}] table [{}] producers fail", db.getSchemaName(), - table.getTableName(), e); } } } - } - producers.forEach(p -> executor.execute(() -> p.start(flag))); + producers.forEach(p -> executor.execute(() -> p.start(flag))); + + }, 0, config.getExecutePeriod(), TimeUnit.SECONDS); } @Override @@ -115,6 +133,18 @@ protected void shutdown() throws Exception { log.info("shutdown thread pool fail"); } } + if (!scheduledThreadPoolExecutor.isShutdown()) { + scheduledThreadPoolExecutor.shutdown(); + try { + if (!scheduledThreadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + log.warn("wait scheduledThreadPoolExecutor shutdown timeout, it will shutdown now"); + scheduledThreadPoolExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("shutdown scheduledThreadPoolExecutor fail"); + } + } if (DatabaseConnection.sourceDataSource != null) { DatabaseConnection.sourceDataSource.close(); log.info("data source has been closed"); @@ -128,12 +158,12 @@ public Source create() { @Override public Class configClass() { - return CanalSourceFullConfig.class; + return CanalSourceCheckConfig.class; } @Override public void init(Config config) throws Exception { - this.config = (CanalSourceFullConfig) config; + this.config = (CanalSourceCheckConfig) config; init(); } @@ -141,15 +171,15 @@ private void init() { DatabaseConnection.sourceConfig = this.config.getSourceConnectorConfig(); DatabaseConnection.initSourceConnection(); this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource); - this.positionMgr = new CanalFullPositionMgr(config, tableMgr); - this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime(); - this.queue = new LinkedBlockingQueue<>(config.getPollConfig().getCapacity()); + this.positionMgr = new CanalCheckPositionMgr(config, tableMgr); + this.globalLimiter = RateLimiter.create(config.getRecordPerSecond()); } @Override public void init(ConnectorContext connectorContext) throws Exception { SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; - this.config = (CanalSourceFullConfig) sourceConnectorContext.getSourceConfig(); + CanalSourceConfig canalSourceConfig = (CanalSourceConfig) sourceConnectorContext.getSourceConfig(); + this.config = ConfigUtil.parse(canalSourceConfig.getSourceConfig(), CanalSourceCheckConfig.class); init(); } @@ -172,7 +202,7 @@ public void onException(ConnectRecord record) { public List poll() { while (flag.get()) { try { - List records = queue.poll(maxPollWaitTime, TimeUnit.MILLISECONDS); + List records = queue.poll(5, TimeUnit.SECONDS); if (records == null || records.isEmpty()) { continue; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java index 09e2e0dcf7..df28342c39 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java @@ -27,10 +27,8 @@ import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.DatabaseConnection; import org.apache.eventmesh.connector.canal.source.position.CanalFullPositionMgr; -import org.apache.eventmesh.connector.canal.source.position.TableFullPosition; import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable; import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; @@ -47,6 +45,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.RateLimiter; + import lombok.extern.slf4j.Slf4j; @Slf4j @@ -56,9 +56,9 @@ public class CanalSourceFullConnector extends AbstractComponent implements Sourc private CanalFullPositionMgr positionMgr; private RdbTableMgr tableMgr; private ThreadPoolExecutor executor; - private BlockingQueue> queue; + private final BlockingQueue> queue = new LinkedBlockingQueue<>(10000); private final AtomicBoolean flag = new AtomicBoolean(true); - private long maxPollWaitTime; + private RateLimiter globalLimiter; @Override protected void run() throws Exception { @@ -87,10 +87,11 @@ protected void run() throws Exception { throw new EventMeshException(String.format("db [%s] table [%s] have none table definition info", db.getSchemaName(), table.getTableName())); } - - producers.add(new CanalFullProducer(queue, DatabaseConnection.sourceDataSource, (MySQLTableDef) tableDefinition, - JsonUtils.parseObject(position.getPrimaryKeyRecords(), TableFullPosition.class), - config.getFlushSize())); + CanalFullProducer producer = + new CanalFullProducer(queue, DatabaseConnection.sourceDataSource, (MySQLTableDef) tableDefinition, + position, config.getFlushSize(), config.getPagePerSecond()); + producer.setRecordLimiter(globalLimiter); + producers.add(producer); } catch (Exception e) { log.error("create schema [{}] table [{}] producers fail", db.getSchemaName(), table.getTableName(), e); @@ -138,8 +139,7 @@ private void init() { DatabaseConnection.initSourceConnection(); this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource); this.positionMgr = new CanalFullPositionMgr(config, tableMgr); - this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime(); - this.queue = new LinkedBlockingQueue<>(config.getPollConfig().getCapacity()); + this.globalLimiter = RateLimiter.create(config.getRecordPerSecond()); } @Override @@ -169,7 +169,7 @@ public void onException(ConnectRecord record) { public List poll() { while (flag.get()) { try { - List records = queue.poll(maxPollWaitTime, TimeUnit.MILLISECONDS); + List records = queue.poll(2, TimeUnit.SECONDS); if (records == null || records.isEmpty()) { continue; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceIncrementConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceIncrementConnector.java index 4f7041b478..c6e7603805 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceIncrementConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceIncrementConnector.java @@ -20,6 +20,9 @@ import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceIncrementConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; @@ -38,6 +41,8 @@ import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -47,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager; @@ -89,6 +95,12 @@ public class CanalSourceIncrementConnector implements Source { private RdbTableMgr tableMgr; + private static final String SQL_SELECT_RDB_VERSION = "select version() as rdb_version"; + + private static final String SQL_SELECT_SERVER_UUID_IN_MARIADB = "SELECT @@global.server_id as server_uuid"; + + private static final String SQL_SHOW_SERVER_UUID_IN_MYSQL = "SELECT @@server_uuid as server_uuid"; + @Override public Class configClass() { return CanalSourceConfig.class; @@ -108,13 +120,24 @@ public void init(ConnectorContext connectorContext) throws Exception { if (sourceConnectorContext.getRecordPositionList() != null) { this.sourceConfig.setRecordPositions(sourceConnectorContext.getRecordPositionList()); } + // filter: your_database\\.your_table; .*\\..* (all database & table) + tableFilter = buildTableFilters(sourceConfig); - if (StringUtils.isNotEmpty(sourceConfig.getTableFilter())) { - tableFilter = sourceConfig.getTableFilter(); - } if (StringUtils.isNotEmpty(sourceConfig.getFieldFilter())) { fieldFilter = sourceConfig.getFieldFilter(); } + DatabaseConnection.sourceConfig = sourceConfig.getSourceConnectorConfig(); + DatabaseConnection.initSourceConnection(); + + DataSourceType dataSourceType = checkRDBDataSourceType(DatabaseConnection.sourceDataSource); + String serverUUID = queryServerUUID(DatabaseConnection.sourceDataSource, dataSourceType); + if (StringUtils.isNotEmpty(serverUUID)) { + log.info("init source increment connector, serverUUID: {}", serverUUID); + sourceConfig.setServerUUID(serverUUID); + } else { + log.warn("get source data source serverUUID empty please check"); + } + tableMgr = new RdbTableMgr(sourceConfig.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource); canalServer = CanalServerWithEmbedded.instance(); @@ -152,9 +175,74 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup return instance; } }); - DatabaseConnection.sourceConfig = sourceConfig.getSourceConnectorConfig(); - DatabaseConnection.initSourceConnection(); - tableMgr = new RdbTableMgr(sourceConfig.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource); + } + + private String queryServerUUID(DruidDataSource sourceDataSource, DataSourceType dataSourceType) { + String serverUUID = ""; + try { + String queryServerUUIDSql; + if (DataSourceType.MariaDB.equals(dataSourceType)) { + queryServerUUIDSql = SQL_SELECT_SERVER_UUID_IN_MARIADB; + } else { + queryServerUUIDSql = SQL_SHOW_SERVER_UUID_IN_MYSQL; + } + log.info("execute sql '{}' start.", queryServerUUIDSql); + try (PreparedStatement preparedStatement = sourceDataSource.getConnection().prepareStatement(queryServerUUIDSql)) { + ResultSet resultSet = preparedStatement.executeQuery(); + if (resultSet.next()) { + log.info("execute sql '{}' result:{}", queryServerUUIDSql, resultSet); + serverUUID = resultSet.getString("server_uuid"); + log.info("execute sql '{}',query server_uuid result:{}", queryServerUUIDSql, serverUUID); + return serverUUID; + } + } + } catch (Exception e) { + log.warn("select server_uuid failed,data source:{}", sourceDataSource, e); + throw new RuntimeException("select server_uuid failed"); + } + return serverUUID; + } + + // check is mariadb or mysql + private DataSourceType checkRDBDataSourceType(DruidDataSource sourceDataSource) { + try { + log.info("execute sql '{}' start.", SQL_SELECT_RDB_VERSION); + try (PreparedStatement preparedStatement = sourceDataSource.getConnection().prepareStatement(SQL_SELECT_RDB_VERSION)) { + ResultSet resultSet = preparedStatement.executeQuery(); + if (resultSet.next()) { + log.info("execute sql '{}' result:{}", SQL_SELECT_RDB_VERSION, resultSet); + String rdbVersion = resultSet.getString("rdb_version"); + if (StringUtils.isNotBlank(rdbVersion)) { + if (rdbVersion.toLowerCase().contains(DataSourceType.MariaDB.getName().toLowerCase())) { + return DataSourceType.MariaDB; + } + } + } + } + } catch (Exception e) { + log.warn("select rdb version failed,data source:{}", sourceDataSource, e); + throw new RuntimeException("select rdb version failed"); + } + return DataSourceType.MYSQL; + } + + private String buildTableFilters(CanalSourceIncrementConfig sourceConfig) { + StringBuilder tableFilterBuilder = new StringBuilder(); + Set dbDefinitions = sourceConfig.getSourceConnectorConfig().getDatabases(); + for (RdbDBDefinition dbDefinition : dbDefinitions) { + Set tableDefinitions = dbDefinition.getTables(); + for (RdbTableDefinition rdbTableDefinition : tableDefinitions) { + if (tableFilterBuilder.length() > 0) { + tableFilterBuilder.append(","); + } + String dbName = rdbTableDefinition.getSchemaName(); + String tableName = rdbTableDefinition.getTableName(); + tableFilterBuilder.append(dbName); + tableFilterBuilder.append("\\."); + tableFilterBuilder.append(tableName); + } + } + return tableFilterBuilder.toString(); } private Canal buildCanal(CanalSourceIncrementConfig sourceConfig) { @@ -254,14 +342,7 @@ public void start() throws Exception { @Override public void commit(ConnectRecord record) { - long batchId = Long.parseLong(record.getExtension("messageId")); - int batchIndex = record.getExtension("batchIndex", Integer.class); - int totalBatches = record.getExtension("totalBatches", Integer.class); - if (batchIndex == totalBatches - 1) { - log.debug("ack records batchIndex:{}, totalBatches:{}, batchId:{}", - batchIndex, totalBatches, batchId); - canalServer.ack(clientIdentity, batchId); - } + } @Override @@ -362,10 +443,10 @@ public List poll() { result.add(connectRecord); } } - } else { - // for the message has been filtered need ack message - canalServer.ack(clientIdentity, message.getId()); + log.debug("message {} has been processed", message); } + log.debug("ack message, messageId {}", message.getId()); + canalServer.ack(clientIdentity, message.getId()); return result; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalCheckPositionMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalCheckPositionMgr.java new file mode 100644 index 0000000000..149c62602c --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalCheckPositionMgr.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.source.position; + +import org.apache.eventmesh.common.AbstractComponent; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceCheckConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; +import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.canal.DatabaseConnection; +import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; + +import org.apache.commons.lang3.StringUtils; + +import java.sql.JDBCType; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.LinkedHashMap; +import java.util.Map; + +import javax.sql.DataSource; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class CanalCheckPositionMgr extends AbstractComponent { + + private final CanalSourceCheckConfig config; + private final Map positions = new LinkedHashMap<>(); + private final RdbTableMgr tableMgr; + + public CanalCheckPositionMgr(CanalSourceCheckConfig config, RdbTableMgr tableMgr) { + this.config = config; + this.tableMgr = tableMgr; + } + + @Override + protected void run() throws Exception { + if (config == null || config.getSourceConnectorConfig() == null || config.getSourceConnectorConfig().getDatabases() == null) { + log.info("config or database is null"); + return; + } + prepareRecordPosition(); + initPositions(); + } + + public void prepareRecordPosition() { + if (config.getStartPosition() != null && !config.getStartPosition().isEmpty()) { + for (RecordPosition record : config.getStartPosition()) { + CanalFullRecordOffset offset = (CanalFullRecordOffset) record.getRecordOffset(); + RdbSimpleTable table = new RdbSimpleTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); + positions.put(table, offset.getPosition()); + } + } + } + + public JobRdbFullPosition getPosition(RdbSimpleTable table) { + return positions.get(table); + } + + public boolean isFinished() { + for (JobRdbFullPosition position : positions.values()) { + if (!position.isFinished()) { + log.info("schema [{}] table [{}] is not finish", position.getSchema(), position.getTableName()); + return false; + } + } + return true; + } + + private void initPositions() { + for (RdbDBDefinition database : config.getSourceConnectorConfig().getDatabases()) { + for (RdbTableDefinition table : database.getTables()) { + try { + RdbSimpleTable simpleTable = new RdbSimpleTable(database.getSchemaName(), table.getTableName()); + RdbTableDefinition tableDefinition; + if ((tableDefinition = tableMgr.getTable(simpleTable)) == null) { + log.error("db [{}] table [{}] definition is null", database.getSchemaName(), table.getTableName()); + continue; + } + log.info("init position of data [{}] table [{}]", database.getSchemaName(), table.getTableName()); + + JobRdbFullPosition recordPosition = positions.get(simpleTable); + if (recordPosition == null || !recordPosition.isFinished()) { + positions.put(simpleTable, + fetchTableInfo(DatabaseConnection.sourceDataSource, (MySQLTableDef) tableDefinition, recordPosition)); + } + } catch (Exception e) { + log.error("process schema [{}] table [{}] position fail", database.getSchemaName(), table.getTableName(), e); + } + + } + } + } + + private JobRdbFullPosition fetchTableInfo(DataSource dataSource, MySQLTableDef tableDefinition, JobRdbFullPosition recordPosition) + throws SQLException { + TableFullPosition position = new TableFullPosition(); + Map preMinPrimaryKeys = new LinkedHashMap<>(); + Map preMaxPrimaryKeys = new LinkedHashMap<>(); + for (String pk : tableDefinition.getPrimaryKeys()) { + Object min = fetchMinPrimaryKey(dataSource, tableDefinition, preMinPrimaryKeys, pk); + Object max = fetchMaxPrimaryKey(dataSource, tableDefinition, preMaxPrimaryKeys, pk); + preMinPrimaryKeys.put(pk, min); + preMaxPrimaryKeys.put(pk, max); + position.getCurPrimaryKeyCols().put(pk, min); + position.getMinPrimaryKeyCols().put(pk, min); + position.getMaxPrimaryKeyCols().put(pk, max); + } + JobRdbFullPosition jobRdbFullPosition = new JobRdbFullPosition(); + if (recordPosition != null) { + if (StringUtils.isNotBlank(recordPosition.getPrimaryKeyRecords())) { + TableFullPosition record = JsonUtils.parseObject(recordPosition.getPrimaryKeyRecords(), TableFullPosition.class); + if (record != null && record.getCurPrimaryKeyCols() != null && !record.getCurPrimaryKeyCols().isEmpty()) { + position.setCurPrimaryKeyCols(record.getCurPrimaryKeyCols()); + } + } + jobRdbFullPosition.setPercent(recordPosition.getPercent()); + } + long rowCount = queryCurTableRowCount(dataSource, tableDefinition); + jobRdbFullPosition.setSchema(tableDefinition.getSchemaName()); + jobRdbFullPosition.setTableName(tableDefinition.getTableName()); + jobRdbFullPosition.setMaxCount(rowCount); + jobRdbFullPosition.setPrimaryKeyRecords(JsonUtils.toJSONString(position)); + return jobRdbFullPosition; + } + + + private long queryCurTableRowCount(DataSource datasource, MySQLTableDef tableDefinition) throws SQLException { + String sql = "select `AVG_ROW_LENGTH`,`DATA_LENGTH` from information_schema.TABLES where `TABLE_SCHEMA`='" + tableDefinition.getSchemaName() + + "' and `TABLE_NAME`='" + tableDefinition.getTableName() + "'"; + try (Statement statement = datasource.getConnection().createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { + long result = 0L; + if (resultSet.next()) { + long avgRowLength = resultSet.getLong("AVG_ROW_LENGTH"); + long dataLength = resultSet.getLong("DATA_LENGTH"); + if (avgRowLength != 0L) { + result = dataLength / avgRowLength; + } + } + return result; + } + } + + private void appendPrePrimaryKey(Map preMap, StringBuilder sql) { + if (preMap != null && !preMap.isEmpty()) { + sql.append(" WHERE "); + boolean first = true; + for (Map.Entry entry : preMap.entrySet()) { + if (first) { + first = false; + } else { + sql.append(" AND "); + } + sql.append(Constants.MySQLQuot).append(entry.getKey()).append(Constants.MySQLQuot).append("=?"); + } + } + } + + private void setValue2Statement(PreparedStatement ps, Map preMap, MySQLTableDef tableDefinition) throws SQLException { + if (preMap != null && !preMap.isEmpty()) { + int index = 1; + for (Map.Entry entry : preMap.entrySet()) { + RdbColumnDefinition def = tableDefinition.getColumnDefinitions().get(entry.getKey()); + ps.setObject(index, entry.getValue(), def.getJdbcType().getVendorTypeNumber()); + ++index; + } + } + } + + private Object fetchMinPrimaryKey(DataSource dataSource, MySQLTableDef tableDefinition, Map prePrimary, String curPrimaryKeyCol) + throws SQLException { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT MIN(").append(Constants.MySQLQuot).append(curPrimaryKeyCol).append(Constants.MySQLQuot) + .append(") min_primary_key FROM").append(Constants.MySQLQuot).append(tableDefinition.getSchemaName()).append(Constants.MySQLQuot) + .append(".").append(Constants.MySQLQuot).append(tableDefinition.getTableName()).append(Constants.MySQLQuot); + appendPrePrimaryKey(prePrimary, builder); + String sql = builder.toString(); + log.info("fetch min primary sql [{}]", sql); + try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { + setValue2Statement(statement, prePrimary, tableDefinition); + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(curPrimaryKeyCol); + if (columnDefinition.getJdbcType() == JDBCType.TIMESTAMP) { + return resultSet.getString("min_primary_key"); + } else { + return resultSet.getObject("min_primary_key"); + } + } + } + } + return null; + } + + private Object fetchMaxPrimaryKey(DataSource dataSource, MySQLTableDef tableDefinition, Map prePrimary, String curPrimaryKeyCol) + throws SQLException { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT MAX(").append(Constants.MySQLQuot).append(curPrimaryKeyCol).append(Constants.MySQLQuot) + .append(") max_primary_key FROM").append(Constants.MySQLQuot).append(tableDefinition.getSchemaName()).append(Constants.MySQLQuot) + .append(".").append(Constants.MySQLQuot).append(tableDefinition.getTableName()).append(Constants.MySQLQuot); + appendPrePrimaryKey(prePrimary, builder); + String sql = builder.toString(); + log.info("fetch max primary sql [{}]", sql); + try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { + setValue2Statement(statement, prePrimary, tableDefinition); + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(curPrimaryKeyCol); + if (columnDefinition.getJdbcType() == JDBCType.TIMESTAMP) { + return resultSet.getString("max_primary_key"); + } else { + return resultSet.getObject("max_primary_key"); + } + } + } + } + return null; + } + + + @Override + protected void shutdown() throws Exception { + + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java index 0ae1f8f8ff..dad0ddbf3b 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java @@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils; +import java.sql.Connection; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -153,7 +154,8 @@ private JobRdbFullPosition fetchTableInfo(DataSource dataSource, MySQLTableDef t private long queryCurTableRowCount(DataSource datasource, MySQLTableDef tableDefinition) throws SQLException { String sql = "select `AVG_ROW_LENGTH`,`DATA_LENGTH` from information_schema.TABLES where `TABLE_SCHEMA`='" + tableDefinition.getSchemaName() + "' and `TABLE_NAME`='" + tableDefinition.getTableName() + "'"; - try (Statement statement = datasource.getConnection().createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { + try (Connection conn = datasource.getConnection(); Statement statement = conn.createStatement(); + ResultSet resultSet = statement.executeQuery(sql)) { long result = 0L; if (resultSet.next()) { long avgRowLength = resultSet.getLong("AVG_ROW_LENGTH"); @@ -201,7 +203,7 @@ private Object fetchMinPrimaryKey(DataSource dataSource, MySQLTableDef tableDefi appendPrePrimaryKey(prePrimary, builder); String sql = builder.toString(); log.info("fetch min primary sql [{}]", sql); - try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { + try (Connection conn = dataSource.getConnection(); PreparedStatement statement = conn.prepareStatement(sql)) { setValue2Statement(statement, prePrimary, tableDefinition); try (ResultSet resultSet = statement.executeQuery()) { if (resultSet.next()) { @@ -226,7 +228,7 @@ private Object fetchMaxPrimaryKey(DataSource dataSource, MySQLTableDef tableDefi appendPrePrimaryKey(prePrimary, builder); String sql = builder.toString(); log.info("fetch max primary sql [{}]", sql); - try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { + try (Connection conn = dataSource.getConnection(); PreparedStatement statement = conn.prepareStatement(sql)) { setValue2Statement(statement, prePrimary, tableDefinition); try (ResultSet resultSet = statement.executeQuery()) { if (resultSet.next()) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java index de7a45dc99..954b81ca70 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java @@ -27,13 +27,13 @@ import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.connector.canal.SqlUtils; +import java.sql.Connection; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -49,6 +49,7 @@ @Slf4j public class RdbTableMgr extends AbstractComponent { + private final JdbcConfig config; private final Map tables = new HashMap<>(); private final DataSource dataSource; @@ -85,7 +86,7 @@ protected void run() { if (primaryKeys == null || primaryKeys.isEmpty() || primaryKeys.get(table.getTableName()) == null) { log.warn("init db [{}] table [{}] info, and primary keys are empty", db.getSchemaName(), table.getTableName()); } else { - mysqlTable.setPrimaryKeys(new HashSet<>(primaryKeys.get(table.getTableName()))); + mysqlTable.setPrimaryKeys(primaryKeys.get(table.getTableName())); } if (columns == null || columns.isEmpty() || columns.get(table.getTableName()) == null) { log.warn("init db [{}] table [{}] info, and columns are empty", db.getSchemaName(), table.getTableName()); @@ -116,25 +117,26 @@ private Map> queryTablePrimaryKey(String schema, List { - if (v == null) { - v = new LinkedList<>(); - } - v.add(colName); - return v; - }); + try (ResultSet rs = statement.executeQuery()) { + if (rs == null) { + return null; + } + while (rs.next()) { + String tableName = rs.getString("TABLE_NAME"); + String colName = rs.getString("COLUMN_NAME"); + primaryKeys.compute(tableName, (k, v) -> { + if (v == null) { + v = new LinkedList<>(); + } + v.add(colName); + return v; + }); + } } - resultSet.close(); } return primaryKeys; } @@ -146,22 +148,27 @@ private Map> queryColumns(String schema, List> cols = new LinkedHashMap<>(); - try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { + Connection conn = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + conn = dataSource.getConnection(); + statement = conn.prepareStatement(sql); statement.setString(1, schema); SqlUtils.setInClauseParameters(statement, 2, tables); - ResultSet resultSet = statement.executeQuery(); - if (resultSet == null) { + rs = statement.executeQuery(); + if (rs == null) { return null; } - while (resultSet.next()) { - String dataType = resultSet.getString("DATA_TYPE"); + while (rs.next()) { + String dataType = rs.getString("DATA_TYPE"); JDBCType jdbcType = SqlUtils.toJDBCType(dataType); MySQLColumnDef col = new MySQLColumnDef(); col.setJdbcType(jdbcType); col.setType(CanalMySQLType.valueOfCode(dataType)); - String colName = resultSet.getString("COLUMN_NAME"); + String colName = rs.getString("COLUMN_NAME"); col.setName(colName); - String tableName = resultSet.getString("TABLE_NAME"); + String tableName = rs.getString("TABLE_NAME"); cols.compute(tableName, (k, v) -> { if (v == null) { v = new LinkedList<>(); @@ -170,7 +177,30 @@ private Map> queryColumns(String schema, List