Skip to content

Commit

Permalink
close to finish full read and begin full write
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN committed Jul 23, 2024
1 parent 3a08039 commit 37e7078
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 131 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.apache.eventmesh.common.config.connector.rdb.canal;

import com.mysql.cj.MysqlType;

import java.util.HashMap;
import java.util.Map;

import com.mysql.cj.MysqlType;

public enum CanalMySQLType {
BIT("BIT"),
TINYINT("TINYINT"),
Expand Down Expand Up @@ -61,6 +61,29 @@ public enum CanalMySQLType {
}
}

public String genPrepareStatement4Insert() {
switch (this) {
case GEOMETRY:
case GEOM_COLLECTION:
case GEOMETRY_COLLECTION:
return "ST_GEOMFROMTEXT(?)";
case POINT:
return "ST_PointFromText(?)";
case LINESTRING:
return "ST_LineStringFromText(?)";
case POLYGON:
return "ST_PolygonFromText(?)";
case MULTIPOINT:
return "ST_MultiPointFromText(?)";
case MULTILINESTRING:
return "ST_MultiLineStringFromText(?)";
case MULTIPOLYGON:
return "ST_MultiPolygonFromText(?)";
default:
return "?";
}
}

public static CanalMySQLType valueOfCode(String code) {
CanalMySQLType type = TYPES.get(code.toUpperCase());
if (type != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.io.WKBReader;
import org.locationtech.jts.io.WKTReader;

import com.mysql.cj.Constants;
import com.mysql.cj.MysqlType;
Expand All @@ -62,8 +63,8 @@ public class SqlUtils {
public static final String REQUIRED_FIELD_NULL_SUBSTITUTE = " ";
private static final Map<Integer, Class<?>> sqlTypeToJavaTypeMap = new HashMap<Integer, Class<?>>();
private static final ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
private static final WKBReader WKB_READER = new WKBReader(new GeometryFactory());
private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory();
private static final WKBReader WKB_READER = new WKBReader(GEOMETRY_FACTORY);
private static final BigDecimal NANO_SEC = new BigDecimal(LogBuffer.DIG_BASE);
private static final LocalDateTime BASE = LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
private static final long ONE_HOUR = 3600;
Expand Down Expand Up @@ -379,7 +380,7 @@ public static BigDecimal toBigDecimal(Object value) {
return BigDecimal.valueOf(((Float) value).doubleValue());
}
if (value instanceof BigInteger) {
return new BigDecimal((BigInteger)value);
return new BigDecimal((BigInteger) value);
}
if (value instanceof Byte) {
return BigDecimal.valueOf(((Byte) value).longValue());
Expand Down Expand Up @@ -540,24 +541,29 @@ private static LocalDateTime toLocalDateTime(String value) {
String[] secondParts = timeParts[2].split("\\.");
secondParts[1] = StringUtils.rightPad(secondParts[1], 9, Constants.CJ_MINOR_VERSION);
return LocalDateTime.of(Integer.parseInt(dataParts2[0]), Integer.parseInt(dataParts2[1]), Integer.parseInt(dataParts2[2]),
Integer.parseInt(timeParts[0]), Integer.parseInt(timeParts[1]), Integer.parseInt(secondParts[0]), Integer.parseInt(secondParts[1]));
Integer.parseInt(timeParts[0]), Integer.parseInt(timeParts[1]), Integer.parseInt(secondParts[0]),
Integer.parseInt(secondParts[1]));
case 10:
String[] dataParts3 = dateStr2.split("-");
return LocalDateTime.of(Integer.parseInt(dataParts3[0]), Integer.parseInt(dataParts3[1]), Integer.parseInt(dataParts3[2]), 0, 0, 0, 0);
return LocalDateTime.of(Integer.parseInt(dataParts3[0]), Integer.parseInt(dataParts3[1]), Integer.parseInt(dataParts3[2]), 0,
0, 0, 0);
case 13:
String[] dataTime2 = dateStr2.split(" ");
String[] dataParts4 = dataTime2[0].split("-");
return LocalDateTime.of(Integer.parseInt(dataParts4[0]), Integer.parseInt(dataParts4[1]), Integer.parseInt(dataParts4[2]), Integer.parseInt(dataTime2[1]), 0, 0, 0);
return LocalDateTime.of(Integer.parseInt(dataParts4[0]), Integer.parseInt(dataParts4[1]), Integer.parseInt(dataParts4[2]),
Integer.parseInt(dataTime2[1]), 0, 0, 0);
case 16:
String[] dataTime3 = dateStr2.split(" ");
String[] dataParts5 = dataTime3[0].split("-");
String[] timeParts2 = dataTime3[1].split(":");
return LocalDateTime.of(Integer.parseInt(dataParts5[0]), Integer.parseInt(dataParts5[1]), Integer.parseInt(dataParts5[2]), Integer.parseInt(timeParts2[0]), Integer.parseInt(timeParts2[1]), 0, 0);
return LocalDateTime.of(Integer.parseInt(dataParts5[0]), Integer.parseInt(dataParts5[1]), Integer.parseInt(dataParts5[2]),
Integer.parseInt(timeParts2[0]), Integer.parseInt(timeParts2[1]), 0, 0);
case 19:
String[] dataTime4 = dateStr2.split(" ");
String[] dataParts6 = dataTime4[0].split("-");
String[] timeParts3 = dataTime4[1].split(":");
return LocalDateTime.of(Integer.parseInt(dataParts6[0]), Integer.parseInt(dataParts6[1]), Integer.parseInt(dataParts6[2]), Integer.parseInt(timeParts3[0]), Integer.parseInt(timeParts3[1]), Integer.parseInt(timeParts3[2]), 0);
return LocalDateTime.of(Integer.parseInt(dataParts6[0]), Integer.parseInt(dataParts6[1]), Integer.parseInt(dataParts6[2]),
Integer.parseInt(timeParts3[0]), Integer.parseInt(timeParts3[1]), Integer.parseInt(timeParts3[2]), 0);
}
} else if (dateStr2.charAt(2) == ':') {
switch (len) {
Expand All @@ -566,12 +572,14 @@ private static LocalDateTime toLocalDateTime(String value) {
return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts4[0]), Integer.parseInt(timeParts4[1]), 0, 0);
case 8:
String[] timeParts5 = dateStr2.split(":");
return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts5[0]), Integer.parseInt(timeParts5[1]), Integer.parseInt(timeParts5[2]), 0);
return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts5[0]), Integer.parseInt(timeParts5[1]),
Integer.parseInt(timeParts5[2]), 0);
default:
String[] timeParts6 = dateStr2.split(":");
String[] secondParts2 = timeParts6[2].split("\\.");
secondParts2[1] = StringUtils.rightPad(secondParts2[1], 9, Constants.CJ_MINOR_VERSION);
return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts6[0]), Integer.parseInt(timeParts6[1]), Integer.parseInt(secondParts2[0]), Integer.parseInt(secondParts2[1]));
return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts6[0]), Integer.parseInt(timeParts6[1]),
Integer.parseInt(secondParts2[0]), Integer.parseInt(secondParts2[1]));
}
} else {
throw new UnsupportedOperationException(value.getClass() + ", value '" + value + "' , parse to local date time failed.");
Expand Down Expand Up @@ -627,14 +635,17 @@ public static String convertToString(Object value) {
}
if (value instanceof Timestamp) {
long nanos = ((Timestamp) value).getNanos();
value = Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault()).toLocalDateTime();
value = Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault())
.toLocalDateTime();
} else if (value instanceof Date) {
value = ((Date) value).toLocalDate().atTime(0, 0);
} else if (value instanceof Time) {
value = LocalDateTime.of(LocalDate.of(1970, 1, 1), Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime());
value = LocalDateTime.of(LocalDate.of(1970, 1, 1),
Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime());
} else if (value instanceof java.util.Date) {
value = ((java.util.Date) value).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
} if (value instanceof LocalDateTime) {
}
if (value instanceof LocalDateTime) {
return coverLocalDateTime2String((LocalDateTime) value);
} else if (value instanceof OffsetDateTime) {
OffsetDateTime zone = (OffsetDateTime) value;
Expand Down Expand Up @@ -664,7 +675,8 @@ private static String coverLocalDateTime2String(LocalDateTime localDateTime) {
int second = localTime.getSecond();
int nano = localTime.getNano();
return nano == 0 ? String.format("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second) :
String.format("%04d-%02d-%02d %02d:%02d:%02d.%s", year, month, day, hour, minute, second, new BigDecimal(nano).divide(NANO_SEC).toPlainString().substring(2));
String.format("%04d-%02d-%02d %02d:%02d:%02d.%s", year, month, day, hour, minute, second,
new BigDecimal(nano).divide(NANO_SEC).toPlainString().substring(2));
}

public static String toMySqlTime(Object value) {
Expand Down Expand Up @@ -775,14 +787,16 @@ public static LocalDateTime toLocalDateTime(Object value) {
} else {
if (value instanceof Timestamp) {
long nanos = ((Timestamp) value).getNanos();
return Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault())
.toLocalDateTime();
} else if (value instanceof java.sql.Date) {
return ((java.sql.Date) value).toLocalDate().atTime(0, 0);
} else {
if (!(value instanceof Time)) {
return ((java.util.Date) value).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
}
return LocalDateTime.of(LocalDate.of(1970, 1, 1), Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime());
return LocalDateTime.of(LocalDate.of(1970, 1, 1),
Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime());
}
}
}
Expand All @@ -796,7 +810,9 @@ public static boolean isHexNumber(String str) {
while (true) {
if (i < str.length()) {
char cc = str.charAt(i);
if (cc != '0' && cc != '1' && cc != '2' && cc != '3' && cc != '4' && cc != '5' && cc != '6' && cc != '7' && cc != '8' && cc != '9' && cc != 'A' && cc != 'B' && cc != 'C' && cc != 'D' && cc != 'E' && cc != 'F' && cc != 'a' && cc != 'b' && cc != 'c' && cc != 'd' && cc != 'e' && cc != 'f') {
if (cc != '0' && cc != '1' && cc != '2' && cc != '3' && cc != '4' && cc != '5' && cc != '6' && cc != '7' && cc != '8' && cc != '9' &&
cc != 'A' && cc != 'B' && cc != 'C' && cc != 'D' && cc != 'E' && cc != 'F' && cc != 'a' && cc != 'b' && cc != 'c' && cc != 'd' &&
cc != 'e' && cc != 'f') {
flag = false;
break;
}
Expand Down Expand Up @@ -834,18 +850,26 @@ public static String toGeometry(Object value) throws Exception {
if (!strVal.startsWith("0x") && !strVal.startsWith("0X")) {
return (String) value;
}
return new WKBReader(GEOMETRY_FACTORY).read(hex2bytes(strVal.substring(2))).toText();
return new WKTReader().read((String) value).toText();
} else if (value instanceof byte[]) {
return new WKBReader(GEOMETRY_FACTORY).read((byte[]) value).toText();
// mysql add 4 byte in header of geometry
byte[] bytes = (byte[]) value;
if (bytes.length > 4) {
byte[] dst = new byte[bytes.length - 4];
System.arraycopy(bytes, 4, dst, 0, bytes.length - 4);
return new WKBReader().read(dst).toText();
}
return new WKBReader().read(bytes).toText();
} else {
throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , " +
"parse to geometry failed.");
}
}

public static byte[] hex2bytes(String hexStr) {
if (hexStr == null)
if (hexStr == null) {
return null;
}
if (org.apache.commons.lang3.StringUtils.isBlank(hexStr)) {
return new byte[0];
}
Expand All @@ -860,64 +884,47 @@ public static byte[] hex2bytes(String hexStr) {
int index = i * 2;
char c1 = hexStr.charAt(index);
char c2 = hexStr.charAt(index + 1);
ret[i] = (byte) ((byte) c1 << 4);
ret[i] = (byte) (ret[i] | (byte) (c2));
ret[i] = (byte) (toByte(c1) << 4);
ret[i] = (byte) (ret[i] | toByte(c2));
}
return ret;
}

private static byte toByte(char src) {
switch (Character.toUpperCase(src)) {
case '0':
return 0;
case '1':
return 1;
case '2':
return 2;
case '3':
return 3;
case '4':
return 4;
case '5':
return 5;
case '6':
return 6;
case '7':
return 7;
case '8':
return 8;
case '9':
return 9;
case 'A':
return 10;
case 'B':
return 11;
case 'C':
return 12;
case 'D':
return 13;
case 'E':
return 14;
case 'F':
return 15;
}
throw new IllegalStateException("0-F");
}

public static String toGisWKT(Object value) throws Exception {
if (value == null) {
return null;
}
if (value instanceof String) {
String strVal = (String) value;
if (!strVal.startsWith("0x") && !strVal.startsWith("0X")) {
return (String) value;
}
return WKB_READER.read(hex2bytes(strVal.substring(2))).toText();
} else if (value instanceof byte[]) {
return WKB_READER.read((byte[]) value).toText();
} else {
throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , parse to gis failed.");
}
switch (Character.toUpperCase(src)) {
case '0':
return 0;
case '1':
return 1;
case '2':
return 2;
case '3':
return 3;
case '4':
return 4;
case '5':
return 5;
case '6':
return 6;
case '7':
return 7;
case '8':
return 8;
case '9':
return 9;
case 'A':
return 10;
case 'B':
return 11;
case 'C':
return 12;
case 'D':
return 13;
case 'E':
return 14;
case 'F':
return 15;
}
throw new IllegalStateException("0-F");
}
}
Loading

0 comments on commit 37e7078

Please sign in to comment.