diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java index 257822810e..e14ea78c82 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java @@ -1,10 +1,10 @@ package org.apache.eventmesh.common.config.connector.rdb.canal; -import com.mysql.cj.MysqlType; - import java.util.HashMap; import java.util.Map; +import com.mysql.cj.MysqlType; + public enum CanalMySQLType { BIT("BIT"), TINYINT("TINYINT"), @@ -61,6 +61,29 @@ public enum CanalMySQLType { } } + public String genPrepareStatement4Insert() { + switch (this) { + case GEOMETRY: + case GEOM_COLLECTION: + case GEOMETRY_COLLECTION: + return "ST_GEOMFROMTEXT(?)"; + case POINT: + return "ST_PointFromText(?)"; + case LINESTRING: + return "ST_LineStringFromText(?)"; + case POLYGON: + return "ST_PolygonFromText(?)"; + case MULTIPOINT: + return "ST_MultiPointFromText(?)"; + case MULTILINESTRING: + return "ST_MultiLineStringFromText(?)"; + case MULTIPOLYGON: + return "ST_MultiPolygonFromText(?)"; + default: + return "?"; + } + } + public static CanalMySQLType valueOfCode(String code) { CanalMySQLType type = TYPES.get(code.toUpperCase()); if (type != null) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java index 27afd22735..05b6b1836f 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java @@ -52,6 +52,7 @@ import org.locationtech.jts.geom.GeometryFactory; import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKTReader; import com.mysql.cj.Constants; import com.mysql.cj.MysqlType; @@ -62,8 +63,8 @@ public class SqlUtils { public static final String REQUIRED_FIELD_NULL_SUBSTITUTE = " "; private static final Map> sqlTypeToJavaTypeMap = new HashMap>(); private static final ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean(); - private static final WKBReader WKB_READER = new WKBReader(new GeometryFactory()); private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory(); + private static final WKBReader WKB_READER = new WKBReader(GEOMETRY_FACTORY); private static final BigDecimal NANO_SEC = new BigDecimal(LogBuffer.DIG_BASE); private static final LocalDateTime BASE = LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0); private static final long ONE_HOUR = 3600; @@ -379,7 +380,7 @@ public static BigDecimal toBigDecimal(Object value) { return BigDecimal.valueOf(((Float) value).doubleValue()); } if (value instanceof BigInteger) { - return new BigDecimal((BigInteger)value); + return new BigDecimal((BigInteger) value); } if (value instanceof Byte) { return BigDecimal.valueOf(((Byte) value).longValue()); @@ -540,24 +541,29 @@ private static LocalDateTime toLocalDateTime(String value) { String[] secondParts = timeParts[2].split("\\."); secondParts[1] = StringUtils.rightPad(secondParts[1], 9, Constants.CJ_MINOR_VERSION); return LocalDateTime.of(Integer.parseInt(dataParts2[0]), Integer.parseInt(dataParts2[1]), Integer.parseInt(dataParts2[2]), - Integer.parseInt(timeParts[0]), Integer.parseInt(timeParts[1]), Integer.parseInt(secondParts[0]), Integer.parseInt(secondParts[1])); + Integer.parseInt(timeParts[0]), Integer.parseInt(timeParts[1]), Integer.parseInt(secondParts[0]), + Integer.parseInt(secondParts[1])); case 10: String[] dataParts3 = dateStr2.split("-"); - return LocalDateTime.of(Integer.parseInt(dataParts3[0]), Integer.parseInt(dataParts3[1]), Integer.parseInt(dataParts3[2]), 0, 0, 0, 0); + return LocalDateTime.of(Integer.parseInt(dataParts3[0]), Integer.parseInt(dataParts3[1]), Integer.parseInt(dataParts3[2]), 0, + 0, 0, 0); case 13: String[] dataTime2 = dateStr2.split(" "); String[] dataParts4 = dataTime2[0].split("-"); - return LocalDateTime.of(Integer.parseInt(dataParts4[0]), Integer.parseInt(dataParts4[1]), Integer.parseInt(dataParts4[2]), Integer.parseInt(dataTime2[1]), 0, 0, 0); + return LocalDateTime.of(Integer.parseInt(dataParts4[0]), Integer.parseInt(dataParts4[1]), Integer.parseInt(dataParts4[2]), + Integer.parseInt(dataTime2[1]), 0, 0, 0); case 16: String[] dataTime3 = dateStr2.split(" "); String[] dataParts5 = dataTime3[0].split("-"); String[] timeParts2 = dataTime3[1].split(":"); - return LocalDateTime.of(Integer.parseInt(dataParts5[0]), Integer.parseInt(dataParts5[1]), Integer.parseInt(dataParts5[2]), Integer.parseInt(timeParts2[0]), Integer.parseInt(timeParts2[1]), 0, 0); + return LocalDateTime.of(Integer.parseInt(dataParts5[0]), Integer.parseInt(dataParts5[1]), Integer.parseInt(dataParts5[2]), + Integer.parseInt(timeParts2[0]), Integer.parseInt(timeParts2[1]), 0, 0); case 19: String[] dataTime4 = dateStr2.split(" "); String[] dataParts6 = dataTime4[0].split("-"); String[] timeParts3 = dataTime4[1].split(":"); - return LocalDateTime.of(Integer.parseInt(dataParts6[0]), Integer.parseInt(dataParts6[1]), Integer.parseInt(dataParts6[2]), Integer.parseInt(timeParts3[0]), Integer.parseInt(timeParts3[1]), Integer.parseInt(timeParts3[2]), 0); + return LocalDateTime.of(Integer.parseInt(dataParts6[0]), Integer.parseInt(dataParts6[1]), Integer.parseInt(dataParts6[2]), + Integer.parseInt(timeParts3[0]), Integer.parseInt(timeParts3[1]), Integer.parseInt(timeParts3[2]), 0); } } else if (dateStr2.charAt(2) == ':') { switch (len) { @@ -566,12 +572,14 @@ private static LocalDateTime toLocalDateTime(String value) { return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts4[0]), Integer.parseInt(timeParts4[1]), 0, 0); case 8: String[] timeParts5 = dateStr2.split(":"); - return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts5[0]), Integer.parseInt(timeParts5[1]), Integer.parseInt(timeParts5[2]), 0); + return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts5[0]), Integer.parseInt(timeParts5[1]), + Integer.parseInt(timeParts5[2]), 0); default: String[] timeParts6 = dateStr2.split(":"); String[] secondParts2 = timeParts6[2].split("\\."); secondParts2[1] = StringUtils.rightPad(secondParts2[1], 9, Constants.CJ_MINOR_VERSION); - return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts6[0]), Integer.parseInt(timeParts6[1]), Integer.parseInt(secondParts2[0]), Integer.parseInt(secondParts2[1])); + return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts6[0]), Integer.parseInt(timeParts6[1]), + Integer.parseInt(secondParts2[0]), Integer.parseInt(secondParts2[1])); } } else { throw new UnsupportedOperationException(value.getClass() + ", value '" + value + "' , parse to local date time failed."); @@ -627,14 +635,17 @@ public static String convertToString(Object value) { } if (value instanceof Timestamp) { long nanos = ((Timestamp) value).getNanos(); - value = Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault()).toLocalDateTime(); + value = Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault()) + .toLocalDateTime(); } else if (value instanceof Date) { value = ((Date) value).toLocalDate().atTime(0, 0); } else if (value instanceof Time) { - value = LocalDateTime.of(LocalDate.of(1970, 1, 1), Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime()); + value = LocalDateTime.of(LocalDate.of(1970, 1, 1), + Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime()); } else if (value instanceof java.util.Date) { value = ((java.util.Date) value).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); - } if (value instanceof LocalDateTime) { + } + if (value instanceof LocalDateTime) { return coverLocalDateTime2String((LocalDateTime) value); } else if (value instanceof OffsetDateTime) { OffsetDateTime zone = (OffsetDateTime) value; @@ -664,7 +675,8 @@ private static String coverLocalDateTime2String(LocalDateTime localDateTime) { int second = localTime.getSecond(); int nano = localTime.getNano(); return nano == 0 ? String.format("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second) : - String.format("%04d-%02d-%02d %02d:%02d:%02d.%s", year, month, day, hour, minute, second, new BigDecimal(nano).divide(NANO_SEC).toPlainString().substring(2)); + String.format("%04d-%02d-%02d %02d:%02d:%02d.%s", year, month, day, hour, minute, second, + new BigDecimal(nano).divide(NANO_SEC).toPlainString().substring(2)); } public static String toMySqlTime(Object value) { @@ -775,14 +787,16 @@ public static LocalDateTime toLocalDateTime(Object value) { } else { if (value instanceof Timestamp) { long nanos = ((Timestamp) value).getNanos(); - return Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault()).toLocalDateTime(); + return Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault()) + .toLocalDateTime(); } else if (value instanceof java.sql.Date) { return ((java.sql.Date) value).toLocalDate().atTime(0, 0); } else { if (!(value instanceof Time)) { return ((java.util.Date) value).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); } - return LocalDateTime.of(LocalDate.of(1970, 1, 1), Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime()); + return LocalDateTime.of(LocalDate.of(1970, 1, 1), + Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime()); } } } @@ -796,7 +810,9 @@ public static boolean isHexNumber(String str) { while (true) { if (i < str.length()) { char cc = str.charAt(i); - if (cc != '0' && cc != '1' && cc != '2' && cc != '3' && cc != '4' && cc != '5' && cc != '6' && cc != '7' && cc != '8' && cc != '9' && cc != 'A' && cc != 'B' && cc != 'C' && cc != 'D' && cc != 'E' && cc != 'F' && cc != 'a' && cc != 'b' && cc != 'c' && cc != 'd' && cc != 'e' && cc != 'f') { + if (cc != '0' && cc != '1' && cc != '2' && cc != '3' && cc != '4' && cc != '5' && cc != '6' && cc != '7' && cc != '8' && cc != '9' && + cc != 'A' && cc != 'B' && cc != 'C' && cc != 'D' && cc != 'E' && cc != 'F' && cc != 'a' && cc != 'b' && cc != 'c' && cc != 'd' && + cc != 'e' && cc != 'f') { flag = false; break; } @@ -834,9 +850,16 @@ public static String toGeometry(Object value) throws Exception { if (!strVal.startsWith("0x") && !strVal.startsWith("0X")) { return (String) value; } - return new WKBReader(GEOMETRY_FACTORY).read(hex2bytes(strVal.substring(2))).toText(); + return new WKTReader().read((String) value).toText(); } else if (value instanceof byte[]) { - return new WKBReader(GEOMETRY_FACTORY).read((byte[]) value).toText(); + // mysql add 4 byte in header of geometry + byte[] bytes = (byte[]) value; + if (bytes.length > 4) { + byte[] dst = new byte[bytes.length - 4]; + System.arraycopy(bytes, 4, dst, 0, bytes.length - 4); + return new WKBReader().read(dst).toText(); + } + return new WKBReader().read(bytes).toText(); } else { throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , " + "parse to geometry failed."); @@ -844,8 +867,9 @@ public static String toGeometry(Object value) throws Exception { } public static byte[] hex2bytes(String hexStr) { - if (hexStr == null) + if (hexStr == null) { return null; + } if (org.apache.commons.lang3.StringUtils.isBlank(hexStr)) { return new byte[0]; } @@ -860,64 +884,47 @@ public static byte[] hex2bytes(String hexStr) { int index = i * 2; char c1 = hexStr.charAt(index); char c2 = hexStr.charAt(index + 1); - ret[i] = (byte) ((byte) c1 << 4); - ret[i] = (byte) (ret[i] | (byte) (c2)); + ret[i] = (byte) (toByte(c1) << 4); + ret[i] = (byte) (ret[i] | toByte(c2)); } return ret; } private static byte toByte(char src) { - switch (Character.toUpperCase(src)) { - case '0': - return 0; - case '1': - return 1; - case '2': - return 2; - case '3': - return 3; - case '4': - return 4; - case '5': - return 5; - case '6': - return 6; - case '7': - return 7; - case '8': - return 8; - case '9': - return 9; - case 'A': - return 10; - case 'B': - return 11; - case 'C': - return 12; - case 'D': - return 13; - case 'E': - return 14; - case 'F': - return 15; - } - throw new IllegalStateException("0-F"); - } - - public static String toGisWKT(Object value) throws Exception { - if (value == null) { - return null; - } - if (value instanceof String) { - String strVal = (String) value; - if (!strVal.startsWith("0x") && !strVal.startsWith("0X")) { - return (String) value; - } - return WKB_READER.read(hex2bytes(strVal.substring(2))).toText(); - } else if (value instanceof byte[]) { - return WKB_READER.read((byte[]) value).toText(); - } else { - throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , parse to gis failed."); - } + switch (Character.toUpperCase(src)) { + case '0': + return 0; + case '1': + return 1; + case '2': + return 2; + case '3': + return 3; + case '4': + return 4; + case '5': + return 5; + case '6': + return 6; + case '7': + return 7; + case '8': + return 8; + case '9': + return 9; + case 'A': + return 10; + case 'B': + return 11; + case 'C': + return 12; + case 'D': + return 13; + case 'E': + return 14; + case 'F': + return 15; + } + throw new IllegalStateException("0-F"); } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java index da6245440b..0508b291b9 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java @@ -1,7 +1,5 @@ package org.apache.eventmesh.connector.canal.sink.connector; -import com.alibaba.druid.pool.DruidPooledConnection; -import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkFullConfig; import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; @@ -32,7 +30,6 @@ import java.util.concurrent.locks.LockSupport; import com.alibaba.druid.pool.DruidPooledConnection; -import com.mysql.cj.exceptions.MysqlErrorNumbers; import lombok.extern.slf4j.Slf4j; @@ -41,6 +38,7 @@ public class CanalSinkFullConnector implements Sink, ConnectorCreateService sinkRecords) { return; } ConnectRecord record = sinkRecords.get(0); - List> data = (List>)record.getData(); + List> data = (List>) record.getData(); if (data == null || data.isEmpty()) { if (log.isDebugEnabled()) { log.debug("[{}] got rows data is none", this.getClass()); @@ -117,7 +116,7 @@ public void put(List sinkRecords) { return; } - MySQLTableDef tableDefinition = (MySQLTableDef)tableMgr.getTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); + MySQLTableDef tableDefinition = (MySQLTableDef) tableMgr.getTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); if (tableDefinition == null) { log.warn("target schema [{}] table [{}] is not exists", offset.getPosition().getSchema(), offset.getPosition().getTableName()); return; @@ -125,24 +124,53 @@ public void put(List sinkRecords) { List cols = new ArrayList<>(tableDefinition.getColumnDefinitions().values()); String sql = generateInsertPrepareSql(offset.getPosition().getSchema(), offset.getPosition().getTableName(), cols); - - try(DruidPooledConnection connection = DatabaseConnection.sinkDataSource.getConnection();PreparedStatement statement = - connection.prepareStatement(sql)) { + DruidPooledConnection connection = null; + PreparedStatement statement = null; + try { + connection = DatabaseConnection.sinkDataSource.getConnection(); + statement = + connection.prepareStatement(sql); for (Map col : data) { setPrepareParams(statement, col, cols); + log.info("insert sql {}", statement.toString()); statement.addBatch(); } statement.executeBatch(); } catch (SQLException e) { - log.warn("sink connector write fail", e); + log.warn("full sink process schema [{}] table [{}] connector write fail", tableDefinition.getSchemaName(),tableDefinition.getTableName(), + e); LockSupport.parkNanos(3000 * 1000L); } catch (Exception e) { - log.error("", e); - // todo rollback + log.error("full sink process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + try { + if (connection != null && !connection.isClosed()) { + connection.rollback(); + } + } catch (SQLException rollback) { + log.warn("full sink process schema [{}] table [{}] rollback fail", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + } + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + log.info("close prepare statement fail", e); + } + } + + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + log.info("close db connection fail", e); + } + } } } - private void setPrepareParams(PreparedStatement preparedStatement, Map col, List columnDefs) throws Exception { + private void setPrepareParams(PreparedStatement preparedStatement, Map col, List columnDefs) throws Exception { for (int i = 0; i < columnDefs.size(); i++) { writeColumn(preparedStatement, i + 1, columnDefs.get(i), col.get(columnDefs.get(i).getName())); } @@ -305,23 +333,18 @@ public void writeColumn(PreparedStatement ps, int index, MySQLColumnDef colType, case GEOMETRY: case GEOMETRY_COLLECTION: case GEOM_COLLECTION: - String geoValue = SqlUtils.toGisWKT(value); - if (geoValue == null) { - ps.setNull(index, Types.VARCHAR); - return; - } - if (geoValue.length() >= 5 && StringUtils.startsWithIgnoreCase(geoValue.substring(0, 5), "SRID=")) { - geoValue = geoValue.substring(geoValue.indexOf(59) + 1); - } - ps.setString(index, geoValue); - return; case POINT: case LINESTRING: case POLYGON: case MULTIPOINT: case MULTILINESTRING: case MULTIPOLYGON: - ps.setNull(index, MysqlErrorNumbers.ER_INVALID_GROUP_FUNC_USE); + String geoValue = SqlUtils.toGeometry(value); + if (geoValue == null) { + ps.setNull(index, Types.VARCHAR); + return; + } + ps.setString(index, geoValue); return; default: throw new UnsupportedOperationException("columnType '" + colType + "' Unsupported."); @@ -348,7 +371,7 @@ private String generateInsertPrepareSql(String schema, String table, List columnValues = new LinkedHashMap<>(); for (Map.Entry col : - tableDefinition.getColumnDefinitions().entrySet()) { + tableDefinition.getColumnDefinitions().entrySet()) { columnValues.put(col.getKey(), readColumn(resultSet, col.getKey(), - col.getValue().getType())); + col.getValue().getType())); } lastCol = columnValues; rows.add(lastCol); @@ -109,24 +109,25 @@ public void start(AtomicBoolean flag) { if (lastCol == null || checkIsScanFinish(lastCol)) { log.info("full scan db [{}] table [{}] finish", tableDefinition.getSchemaName(), - tableDefinition.getTableName()); + tableDefinition.getTableName()); commitConnectRecord(rows); return; } refreshPosition(lastCol); } catch (InterruptedException ignore) { log.info("full scan db [{}] table [{}] interrupted", tableDefinition.getSchemaName(), - tableDefinition.getTableName()); + tableDefinition.getTableName()); Thread.currentThread().interrupt(); return; } } catch (SQLException e) { - log.error("catch SQLException fail", e); + log.error("full source process schema [{}] table [{}] catch SQLException fail",tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); LockSupport.parkNanos(3000 * 1000L); } catch (Exception e) { - log.error("process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), - tableDefinition.getTableName(), e); - LockSupport.parkNanos(3000 * 1000L); + log.error("full source process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + return; } if (isFirstSelect) { isFirstSelect = false; @@ -156,7 +157,7 @@ private boolean checkIsScanFinish(Map lastCol) { if (lastPrimaryValue instanceof Number) { BigDecimal last = new BigDecimal(String.valueOf(lastPrimaryValue)); BigDecimal max = - new BigDecimal(String.valueOf(maxPrimaryValue)); + new BigDecimal(String.valueOf(maxPrimaryValue)); return last.compareTo(max) > 0; } if (lastPrimaryValue instanceof Comparable) { @@ -166,17 +167,15 @@ private boolean checkIsScanFinish(Map lastCol) { } public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throws Exception { + if (col == null || rs.wasNull()) { + return null; + } switch (colType) { case TINYINT: case SMALLINT: case MEDIUMINT: case INT: - Long uLong; - if (rs.wasNull()) { - return null; - } else { - uLong = rs.getLong(col); - } + Long uLong = rs.getLong(col); if (uLong.compareTo((long) Integer.MAX_VALUE) > 0) { return uLong; } @@ -203,9 +202,6 @@ public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throw case TIMESTAMP: return rs.getObject(col, LocalDateTime.class); case YEAR: - if (rs.wasNull()) { - return null; - } return rs.getInt(col); case CHAR: case VARCHAR: @@ -226,11 +222,6 @@ public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throw case LONGBLOB: return rs.getBytes(col); case GEOMETRY: - String geo = rs.getString(col); - if (col == null) { - return null; - } - return SqlUtils.toGeometry("0x" + geo); case GEOMETRY_COLLECTION: case GEOM_COLLECTION: case POINT: @@ -239,14 +230,17 @@ public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throw case MULTIPOINT: case MULTILINESTRING: case MULTIPOLYGON: - return null; + byte[] geo = rs.getBytes(col); + if (geo == null) { + return null; + } + return SqlUtils.toGeometry(geo); default: return rs.getObject(col); } } - private void refreshPosition(Map lastCol) { Map nextPosition = new LinkedHashMap<>(); for (Map.Entry entry : position.getCurPrimaryKeyCols().entrySet()) { @@ -335,7 +329,6 @@ private void setPrepareStatementValue(PreparedStatement statement) throws SQLExc } - private void generateQueryColumnsSql(StringBuilder builder, Collection rdbColDefs) { if (rdbColDefs == null || rdbColDefs.isEmpty()) { builder.append("*"); @@ -373,15 +366,15 @@ private String generateScanSql(boolean isFirst) { private void buildWhereSql(StringBuilder builder, boolean isEquals) { builder.append(" where ") - .append(Constants.MySQLQuot) - .append(choosePrimaryKey.get()) - .append(Constants.MySQLQuot); + .append(Constants.MySQLQuot) + .append(choosePrimaryKey.get()) + .append(Constants.MySQLQuot); if (isEquals) { builder.append(" >= ? "); } else { builder.append(" > ? "); } builder.append(" order by ").append(Constants.MySQLQuot).append(choosePrimaryKey.get()).append(Constants.MySQLQuot) - .append(" asc "); + .append(" asc "); } }