Skip to content

Latest commit

 

History

History
891 lines (858 loc) · 56.2 KB

pg2kafka2tidb.org

File metadata and controls

891 lines (858 loc) · 56.2 KB

Overview

This platform guide you to setup all components to replicate data from Postgres to TiDB through kafka. kafka source connector is used to capture changes from postgres to kafka while confluent JDBC sink is used to populate the captured data into TiDB. To minimize data size in the kafka storage and network cost, confluent avro schema registry is used for data [de]serialization. The demo utilizes SMT to convert the data format from debezium to confluent. Finally, yomo debezium timstamp convert is developed to replicate timestamp’s microseconds as well.

./png/pg2kafka2tidb/pg2kafka2tidb.png

Manual install

TiDB and postgres deployment

OhMyTiUP$./bin/aws pg2kafka2tidb deploy avrotest embed/examples/aws/aws-nodes-tidb2kafka2pg.yaml
... ...
OhMyTiUP$./bin/aws pg2kafka2tidb list avrotest
... ...
Load Balancer:      avrotest-a56a55d7ef009651.elb.us-east-1.amazonaws.com
Resource Type:      EC2
Component Name  Component Cluster  State    Instance ID          Instance Type  Preivate IP   Public IP      Image ID
--------------  -----------------  -----    -----------          -------------  -----------   ---------      --------
alert-manager   tidb               running  i-0309e30865a56daf5  c5.large       182.83.1.243                 ami-07d02ee1eeb0c996c
broker          kafka              running  i-097cf8767d9ebd73d  c5.xlarge      172.83.2.154                 ami-07d02ee1eeb0c996c
broker          kafka              running  i-030ce6c2cbbffacfa  c5.xlarge      172.83.1.176                 ami-07d02ee1eeb0c996c
broker          kafka              running  i-0d97b75de2338c009  c5.xlarge      172.83.3.162                 ami-07d02ee1eeb0c996c
connector       kafka              running  i-0b37c7cb4e224ab17  c5.xlarge      172.83.1.186                 ami-07d02ee1eeb0c996c
monitor         tidb               running  i-08faf5de486ee10ee  c5.large       182.83.2.10                  ami-07d02ee1eeb0c996c
monitor         tidb               running  i-08bd674bf980338fb  c5.large       182.83.1.180                 ami-07d02ee1eeb0c996c
pd              tidb               running  i-04efdc1d46e29980a  c5.large       182.83.1.236                 ami-07d02ee1eeb0c996c
restService     kafka              running  i-0a4fc44d146b8a3d3  c5.large       172.83.1.4                   ami-07d02ee1eeb0c996c
schemaRegistry  kafka              running  i-0e99f2f5acc24f32d  c5.large       172.83.1.200                 ami-07d02ee1eeb0c996c
ticdc           tidb               running  i-095c14cb0b9e1f555  c5.xlarge      182.83.1.210                 ami-07d02ee1eeb0c996c
tidb            tidb               running  i-0c66f72063b57eb99  c5.xlarge      182.83.1.51                  ami-07d02ee1eeb0c996c
tikv            tidb               running  i-030c327ba39ec45cd  c5.xlarge      182.83.3.223                 ami-07d02ee1eeb0c996c
tikv            tidb               running  i-09fb13ffba30f32b4  c5.xlarge      182.83.2.179                 ami-07d02ee1eeb0c996c
tikv            tidb               running  i-0872361380bcf14bc  c5.xlarge      182.83.1.92                  ami-07d02ee1eeb0c996c
workstation     workstation        running  i-0910f69cabcc93f79  c5.2xlarge     172.82.11.85  3.215.175.247  ami-07d02ee1eeb0c996c
zookeeper       kafka              running  i-05fb19cf30037352e  c5.large       172.83.1.135                 ami-07d02ee1eeb0c996c
zookeeper       kafka              running  i-0b666601d983701ac  c5.large       172.83.3.44                  ami-07d02ee1eeb0c996c
zookeeper       kafka              running  i-0d97c64ac6458750c  c5.large       172.83.2.167                 ami-07d02ee1eeb0c996c

MySQL driver install

Confluent JDBC Sink(MySQL) utilizes mysql driver to insert data into TiDB.

OhMyTiUP$ ssh 3.215.175.247
Workstation$ ssh 172.83.1.186
connector$ sudo apt-get update
... ...
connector$ wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.46.tar.gz
connector$ tar xvf mysql-connector-java-5.1.46.tar.gz
connector$ sudo cp mysql-connector-java-5.1.46/*.jar /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/

Postgres source connector deployment

Use confluent-hub to install the debezium connector postgres into kafka connect servers. The demo will use to capture the change data from postgres.

connector$sudo confluent-hub install debezium/debezium-connector-postgresql:1.9.6
bash: warning: setlocale: LC_ALL: cannot change locale (ja_JP.UTF-8)
The component can be installed in any of the following Confluent Platform installations: 
  1. / (installed rpm/deb package) 
  2. / (where this tool is installed) 
Choose one of these to continue the installation (1-2): 1
Do you want to install this into /usr/share/confluent-hub-components? (yN)y

Component's license: 
Apache 2.0 
https://github.com/debezium/debezium/blob/master/LICENSE.txt 
I agree to the software license agreement (yN)y

You are about to install 'debezium-connector-postgresql' from Debezium Community, as published on Confluent Hub. 
Do you want to continue? (yN)y

Downloading component Debezium PostgreSQL CDC Connector 1.9.6, provided by Debezium Community from Confluent Hub and installing into /usr/share/confluent-hub-components 
Detected Worker's configs: 
  1. Standard: /etc/kafka/connect-distributed.properties 
  2. Standard: /etc/kafka/connect-standalone.properties 
  3. Standard: /etc/schema-registry/connect-avro-distributed.properties 
  4. Standard: /etc/schema-registry/connect-avro-standalone.properties 
  5. Used by Connect process with PID 17983: /etc/kafka/connect-distributed.properties 
Do you want to update all detected configs? (yN)y


Adding installation directory to plugin path in the following files: 
  /etc/kafka/connect-distributed.properties 
  /etc/kafka/connect-standalone.properties 
  /etc/schema-registry/connect-avro-distributed.properties 
  /etc/schema-registry/connect-avro-standalone.properties 
  /etc/kafka/connect-distributed.properties 
 
Completed 

Install SPI to support microseconds replication

connector$ sudo wget https://github.com/luyomo/yomo-timestamp-converter/releases/download/v0.0.1/YomoTimestampConverter-1.2.3-SNAPSHOT.jar -P /usr/share/confluent-hub-components/debezium-debezium-connector-postgresql/lib/

Restart connect service

Restart the service to make the postgres source connect and MySQL driver come to effect. If you define multiple connect workers, please restart all the service in all the connect workers

connector$ sudo systemctl restart confluent-kafka-connect

Setup postgres source connector

Postgres test db preparation

Please make sure you have completed the below setup.

  • Set the wal level to logical. The replication slot is used to capture the changes. If you setup the postgres as Master-Slave topo, you will have to consider how to replicate the slot as well. Otherwise the failover will stop the replication.
  • create replication user with appropriate permissions. Please check DEBEZIUM Postgres for user permission setup
workstation$ psql -h avrotest.cxmxisy1o2a2.us-east-1.rds.amazonaws.com -U kafkauser -p 5432 postgres
postgres=> show wal_level;
 wal_level 
-----------
 logical
(1 row)
postgres=> create database test;
CREATE DATABASE
postgres=> grant all on database test to kafkauser;
GRANT
test=>exit
workstation$ psql -h avrotest.cxmxisy1o2a2.us-east-1.rds.amazonaws.com -U kafkauser -p 5432 postgres test
test=> create schema test;
CREATE SCHEMA
test=> create table test.test01(col01 int primary key, col02 int);
CREATE TABLE

Connector preparation

  • Prepare connector configuration, in which replace the value according to your environment.
    workstation$ more /opt/db-info.yml                        <- Postgres connection info from OhMyTiUP
    Host: avrotest.cxmxisy1o2a2.us-east-1.rds.amazonaws.com
    Port: 5432
    User: kafkauser
    Password: 1234Abcd
    workstation$ more /opt/kafka/source.pg.yaml
    {
      "name": "sourcepg",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "avrotest.cxmxisy1o2a2.us-east-1.rds.amazonaws.com",             <- Postgres hostname
        "database.port": "5432",                                                              <- Postgres port
        "database.user": "kafkauser",                                                         <- Postgres user
        "database.password": "1234Abcd",                                                      <- Postgres password
        "database.dbname" : "test",                                                           <- Sync DB
        "database.server.name": "sourcepg",
        "plugin.name": "pgoutput",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://172.83.1.200:8081",                      <- Schema registry
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://172.83.1.200:8081",                    <- Schema registry
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstone": "true",
        "transforms.unwrap.delete.handling.mode": "none",
        "converters": "timestampConverter",
        "timestampConverter.type": "yomo.kafka.connect.util.TimestampConverter",
        "timestampConverter.format.time": "HH:mm:ss.SSSSSS",
        "timestampConverter.format.date": "YYYY-MM-dd",
        "timestampConverter.format.datetime": "YYYY-MM-dd HH:mm:ss.SSSSSS",
        "timestampConverter.debug": "false"
      }
    }
        
  • Source connect preparation
    workstation$ curl -H 'Content-Type: Application/JSON' http://172.83.1.186:8083/connectors -d @'/opt/kafka/source.pg.yaml' | jq 
    {
      "name": "sourcepg",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "avrotest.cxmxisy1o2a2.us-east-1.rds.amazonaws.com",
        "database.port": "5432",
        "database.user": "kafkauser",
        "database.password": "1234Abcd",
        "database.dbname": "test",
        "database.server.name": "sourcepg",
        "plugin.name": "pgoutput",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://172.83.1.200:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://172.83.1.200:8081",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstone": "true",
        "transforms.unwrap.delete.handling.mode": "none",
        "name": "sourcepg",
        "converters": "timestampConverter",
        "timestampConverter.type": "yomo.kafka.connect.util.TimestampConverter",
        "timestampConverter.format.time": "HH:mm:ss.SSSSSS",
        "timestampConverter.format.date": "YYYY-MM-dd",
        "timestampConverter.format.datetime": "YYYY-MM-dd HH:mm:ss.SSSSSS",
        "timestampConverter.debug": "false"
      },
      "tasks": [],
      "type": "source"
    }
    
    workstation$ curl http://172.83.1.186:8083/connectors/sourcepg/status | jq 
    {
      "name": "sourcepg",
      "connector": {
        "state": "RUNNING",
        "worker_id": "172.83.1.186:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "172.83.1.186:8083"
        }
      ],
      "type": "source"
    }
        
  • Check kafka topic
    $ kafka-topics --list --bootstrap-server 172.83.3.162:9092 
    __consumer_offsets
    _schemas
    connect-configs
    connect-offsets
    connect-status
        
  • Insert test data into postgres
    test=> insert into test.test01 values(1,1);
    INSERT 0 1
        
  • Check the generated topic and offset
    workstation$ kafka-topics --list --bootstrap-server 172.83.3.162:9092 
    __consumer_offsets
    _schemas
    connect-configs
    connect-offsets
    connect-status
    sourcepg.test.test01
    workstation$ kafka-run-class kafka.tools.GetOffsetShell --bootstrap-server 172.83.3.162:9092 --topic sourcepg.test.test01   
    sourcepg.test.test01:0:1
        
  • Check the data structure inside the kafka
    workstation$ kafka-avro-console-consumer --bootstrap-server 172.83.3.162:9092 --topic sourcepg.test.test01 --partition 0 --from-beginning --property schema.registry
    .url="http://172.83.1.200:8081" --property print.key=true --property print.value=true
    {"col01":1}     {"col01":1,"col02":{"int":1}}
        

TiDB sink deployment

Table preparation in the mysql

$ mysql -h avrotest-a56a55d7ef009651.elb.us-east-1.amazonaws.com -u root -P 4000 test
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MySQL connection id is 1351
Server version: 5.7.25-TiDB-v6.3.0 TiDB Server (Apache License 2.0) Community Edition, MySQL 5.7 compatible

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MySQL [test]> create table test.test01(col01 int primary key, col02 int);
Query OK, 0 rows affected (0.105 sec)

MySQL [test]> 

Sink connector preparation

$ more /opt/tidb-db-info.yml 
Host: avrotest-a56a55d7ef009651.elb.us-east-1.amazonaws.com
Port: 4000
User: root
Password: 

$ more /opt/kafka/sink.tidb.yaml
{
    "name": "SINKTiDB",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://avrotest-a56a55d7ef009651.elb.us-east-1.amazonaws.com:4000/test?stringtype=unspecified",    <- TiDB connection string
        "connection.user": "root",                                                                                                  <- TiDB user
        "connection.password": "",                                                                                                  <- TiDB user password
        "topics": "sourcepg.test.test01",                                                                                           <- source topic
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "dialect.name":"MySqlDatabaseDialect",
        "table.name.format":"test.test01",                                                                                          <- table name
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://172.83.1.200:8081",                                                            <- schema registry for key
        "value.converter.schema.registry.url": "http://172.83.1.200:8081",                                                          <- schema registry for value
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "pk.mode": "record_key",
        "auto.create":"false",
        "auto.evolve":"false"
    }
}

Verification

Check the data in the MySQL

MySQL [test]> select * from test.test01;
+-------+-------+
| col01 | col02 |
+-------+-------+
|     1 |     1 |
+-------+-------+
1 row in set (0.002 sec)

Replicate Insert Event

  • Insert one row to postgres
     test=> insert into test.test01 values(2,2);
    INSERT 0 1
        
  • Check the inserted data in the TiDB
    MySQL [test]> select * from test.test01;
    +-------+-------+
    | col01 | col02 |
    +-------+-------+
    |     1 |     1 |
    |     2 |     2 |
    +-------+-------+
    2 rows in set (0.002 sec)
        

Replciate Update Event

  • Update one row
    test=> update test.test01 set col02 = 20 where col01 = 2; 
    UPDATE 1
        
  • Check the Update Value in the TiDB
    MySQL [test]> select * from test.test01;
    +-------+-------+
    | col01 | col02 |
    +-------+-------+
    |     1 |     1 |
    |     2 |    20 |
    +-------+-------+
    2 rows in set (0.002 sec)
        

Replicate Delete Event

  • Delete one row from postgres
    test=> delete from test.test01 where col01 = 2; 
    DELETE 1
        
  • Check the deleted row in the TiDB
    MySQL [test]> select * from test.test01;
    +-------+-------+
    | col01 | col02 |
    +-------+-------+
    |     1 |     1 |
    +-------+-------+
    1 row in set (0.002 sec)
        

Data Mapping

PG Data TypeOK/NGTiDB Data TypeCommentpriority
bigintOKBIGINT
bigserialOKBIGINT
bitOKBIT
bit varyingOKBIT(m)
booleanOKTINYINT(1)
byteaOKLONGBLOB
characterOKCHAR/LONGTEXT
NATIONAL CHARACTERCHAR/LONGTEXT
character varyingOKVARCHAR/MEDIUMTEXT/LONGTEXT
DECIMALOKDECIMAL
integerOKINT
moneyOKDECIMAL(19,2)
numericNGDECIMAL
realFLOAT
smallintOKSMALLINT
smallserialOKSMALLINT
serialOKINT
textOKLONGTEXT
uuidOKVARCHAR(36)
time (without time zone)OKTIMEUse custom convert to replicate microseconds from PG
time (with time zone)NGTIME
timestamp (without time zone)OKDATETIMEUse custom convert to replicate microseconds from PG
timestamp (with time zone)NGDATETIME
intervalNGTIMEPlease check issue
dateOKDATE
arrayNGLONGTEXTUnsupported
xmlLONGTEXT
jsonOKJSON
jsonbOKJSON
cidrNGVARCHAR(43)low priority
inetVARCHAR(43)
macaddrVARCHAR(17)
macaddr8
boxNGPOLYGONlow priority
pointPOINT
polygonPOLYGON
circlePOLYGON
lineLINESTRING
pathLINESTRING
lsegLINESTRING
pg_lsn
pg_snapshot
txid_snapshotVARCHAR
tsqueryLONGTEXT
tsvectorLONGTEXT

Default replication event format

Debezium default CDC

{
  "name": "testpg",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "avrotest.cxmxisy1o2a2.us-east-1.rds.amazonaws.com",
    "database.port": "5432",
    "database.user": "kafkauser",
    "database.password": "1234Abcd",
    "database.dbname" : "test",
    "database.server.name": "fulfillment",
    "plugin.name": "pgoutput",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://172.83.1.193:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://172.83.1.193:8081"
  }
}

CDC data format in the kafka

insert into test01 values(4,4,current_timestamp, current_timestamp);  -->
kafka event: {"pk_col":4}    

update test01 set t_int=10 where pk_col = 1152921504606846977;        -->
kafka event: {"pk_col":1152921504606846977}  {"before":null,"after":{"pgsource01.public.test01.Value":{"pk_col":1152921504606846977,"t_int":{"int":10},"tidb_timestamp":{"long":1666707909000000},"pg_timestamp":{"long":1666707912597157}}},"source":{"version":"1.9.6.Final","connector":"postgresql","name":"pgsource01","ts_ms":1666708919887,"snapshot":{"string":"false"},"db":"test","sequence":{"string":"[\"13220446448\",\"13220446728\"]"},"schema":"public","table":"test01","txId":{"long":850},"lsn":{"long":13220446728},"xmin":null},"op":"u","ts_ms":{"long":1666708920091},"transaction":null}

delete from test01 where t_int = 10;                                  -->
kafka event: {"pk_col":1152921504606846977}  null

Issues

confluent JDBC Sink events format requirement

The confluent JDBC Sink is used to fetch the data from kafka and populate it into TiDB. The data format is as below:

CREATE TABLE `test01` (
  `pk_col` bigint(20) NOT NULL /*T![auto_rand] AUTO_RANDOM(5) */,
  `t_int` int(11) DEFAULT NULL,
  `tidb_timestamp` timestamp DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`pk_col`) /*T![clustered_index] CLUSTERED */
)

insert into test01(t_int, tidb_timestamp) values(1,current_timestamp);        -->
{"pk_col":1152921504606846977}  {"pk_col":1152921504606846977,"t_int":{"int":1},"tidb_timestamp":{"string":"2022-10-25 14:25:09"}}

delete from test01;                                                           -->
{"pk_col":1152921504606846977}  null

update test01 set pk_col = 2 where id = xxxx;                                 -->
{"pk_col":5476377146882523139}  {"pk_col":5476377146882523139,"t_int":{"int":2},"tidb_timestamp":{"string":"2022-10-25 14:30:07"}}

unwrap the message in the pg source connect

To unwrap the message from CDC, use transforms to unwrap the message. But the additional column [__deleted] is generated to show the event operation which could not be replicated into TiDB.

{"col01":1}     {"col01":1,"col02":{"int":1},"__deleted":{"string":"false"}}
{"col01":2}     {"col01":2,"col02":{"int":2},"__deleted":{"string":"false"}}
{"col01":1}     {"col01":1,"col02":null,"__deleted":{"string":"true"}}

{
  "name": "pgsource02",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "avrotest.cxmxisy1o2a2.us-east-1.rds.amazonaws.com",
    "database.port": "5432",
    "database.user": "kafkauser",
    "database.password": "1234Abcd",
    "database.dbname" : "test",
    "database.server.name": "pgsource02",
    "plugin.name": "pgoutput",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://172.83.1.193:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://172.83.1.193:8081",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "rewrite"
  }
}

https://stackoverflow.com/questions/72430748/confluent-jdbc-sink-connector-cant-recognize-record-captured-by-debezium-connec

Use ExtractNewRecordState wrap with [delete.handling.mode] as none

  • source config
    {
      "name": "pgsource05",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "avrotest.cxmxisy1o2a2.us-east-1.rds.amazonaws.com",
        "database.port": "5432",
        "database.user": "kafkauser",
        "database.password": "1234Abcd",
        "database.dbname" : "test",
        "database.server.name": "pgsource05",
        "plugin.name": "pgoutput",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://172.83.1.193:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://172.83.1.193:8081",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstone": "true",
        "transforms.unwrap.delete.handling.mode": "none"
      }
    }
        
  • sink config
    {
        "name": "JDBCTEST01",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.url": "jdbc:mysql://avrotest-3780e8fd349b34df.elb.us-east-1.amazonaws.com:4000/test?stringtype=unspecified",
            "connection.user": "root",
            "connection.password": "",
            "topics": "pgsource05.test.test02",
            "insert.mode": "upsert",
            "delete.enabled": "true",
            "dialect.name":"MySqlDatabaseDialect",
            "table.name.format":"test.test02",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://172.83.1.193:8081",
            "value.converter.schema.registry.url": "http://172.83.1.193:8081",
            "key.converter.schemas.enable": "true",
            "value.converter.schemas.enable": "true",
            "pk.mode": "record_key",
            "auto.create":"false",
            "auto.evolve":"false"
        }
    }
        

Issue: Failed to transform epoch into target DATABASE in the sink connect

Epoche: The DEBEZIUM CDC catch the datetime as the epochtime which failed to import to target database directly.

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.delive rMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteratio n(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.  java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at ja va.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java .util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.Connect Exception: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Data truncation: Incorrect timestamp value: '1667137970790403' for column 'pg_timestamp' at row 1\ ncom.mysql.jdbc.MysqlDataTruncation: Data truncation: Incorrect timestamp value: '1667137970790403' for column 'pg_timestamp' at row 1\n
at io.confluent.connect.jdbc.sink.JdbcSin kTask.put(JdbcSinkTask.java:89)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more\nCaused by: java.sql.SQLException: Exce ption chain:\njava.sql.BatchUpdateException: Data truncation: Incorrect timestamp value: '1667137970790403' for column 'pg_timestamp' at row 1\ncom.mysql.jdbc.MysqlDataTruncation: D ata truncation: Incorrect timestamp value: '1667137970790403' for column 'pg_timestamp' at row 1\n
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
...  11 more\n

Resolution 01: To resolve the issue, add the timestampe convert firstly. The options are added as below:

... ...
"transforms": "timestamp",
"transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp.target.type": "Timestamp",
"transforms.timestamp.field":"pg_timestamp",
"transforms.timestamp.format": "yyyy-MM-dd HH:mm:ss"
Wrong conversion from epoch to timestamp
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) 
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Data truncation: Incorrect timestamp value: '54799-07-02 08:46:30.403' for column 'pg_timestamp' at row 1\ncom.mysql.jdbc.MysqlDataTruncation: Data truncation: Incorrect timestamp value: '54799-07-02 08:46:30.403' for column 'pg_timestamp' at row 1\n
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more\nCaused by: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Data truncation: Incorrect timestamp value: '54799-07-02 08:46:30.403' for column 'pg_timestamp' at row 1\ncom.mysql.jdbc.MysqlDataTruncation: Data truncation: Incorrect timestamp value: '54799-07-02 08:46:30.403' for column 'pg_timestamp' at row 1\n
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
... 11 more\n

Resolution 02: Found kafka connect jdbc issue. And get another link debezium timestamp converter to resolve such kind of conversion issue. To do.

Tried to install the converter to plugins initially as follow:

  • Attempt 01
    connector$ wget https://github.com/oryanmoshe/debezium-timestamp-converter/releases/download/v1.2.4/TimestampConverter-1.2.4-SNAPSHOT.jar
    connector$ sudo mv TimestampConverter-1.2.4-SNAPSHOT.jar /usr/share/java/kafka/
    connector$ more /var/log/kafka/connect.log
    ... ...
    [2022-10-31 05:27:40,481] ERROR [SINKTiDB|task-0] WorkerSinkTask{id=SINKTiDB-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
    org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
            at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:540)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.kafka.connect.errors.DataException: Could not parse timestamp: value (2022-10-31T05:16:31.552Z) does not match pattern (yyyy-MM-dd HH:mm:ss.SSS)
            at org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:120)
            at org.apache.kafka.connect.transforms.TimestampConverter.convertTimestamp(TimestampConverter.java:450)
            at org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:375)
            at org.apache.kafka.connect.transforms.TimestampConverter.applyWithSchema(TimestampConverter.java:362)
            at org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:279)
            at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
            ... 14 more
    Caused by: java.text.ParseException: Unparseable date: "2022-10-31T05:16:31.552Z"
            at java.base/java.text.DateFormat.parse(DateFormat.java:395)
            at org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:117)
            ... 21 more
        
  • Attempt 02 Install the jar file to connector lib
    connector$ sudo wget https://github.com/oryanmoshe/debezium-timestamp-converter/releases/download/v1.2.4/TimestampConverter-1.2.4-SNAPSHOT.jar -P /usr/share/confluent-hub-components/debezium-debezium-connector-postgresql/lib/
    connector$ sudo systemctl restart confluent-kafka-connect
    workstation$ more /opt/kafka/source.pg.json
    {
      "name": "sourcepg",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "avrotest.cxmxisy1o2a2.us-east-1.rds.amazonaws.com",
        "database.port": "5432",
        "database.user": "kafkauser",
        "database.password": "1234Abcd",
        "database.dbname" : "test",
        "database.server.name": "sourcepg",
        "plugin.name": "pgoutput",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://172.83.1.179:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://172.83.1.179:8081",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstone": "true",
        "transforms.unwrap.delete.handling.mode": "none",
        "converters": "timestampConverter",
        "timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter",
        "timestampConverter.format.time": "HH:mm:ss.SSS",
        "Timestampconverter.format.date": "YYYY-MM-dd",
        "timestampConverter.format.datetime": "YYYY-MM-dd HH:mm:ss.SSS",
        "timestampConverter.debug": "false"
      }
    }
        

Issue: Microseconds are truncated during replication

Use debezium Timestamp converter, I succeeded to replicate the timestamp from postgres to TiDB. The last issue is that the converter could not replicate microseconds to TiDB using Simple DateTime.

Issue: Does not support truncate operation

Issue: Interval conversion

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic sourcepg.test.test01 :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:326)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 11 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"sourcepg.test.test01\",\"fields\":[{\"name\":\"pk_col\",\"type\":{\"type\":\"long\",\"connect.default\":0},\"default\":0}],\"connect.name\":\"sourcepg.test.test01.Key\"}
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:261)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:168)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
... 15 more\nCaused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register schema operation failed while writing to the Kafka store; error code: 50001
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:301)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:371)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:548)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:536)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:494)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:275)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:382)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:355)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:125)
... 17 more\n

Issue: array unsupport

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: ARRAY
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1569)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:149)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:143)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:78)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:182)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:79)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more

box not support

Failed to catch the b_box field.

{"pk_col":10753}        {"pk_col":10753,"pg_timestamp":{"string":"2022-11-02 10:43:21.534"}}
{"pk_col":10754}        {"pk_col":10754,"pg_timestamp":{"string":"2022-11-02 10:43:21.535"}}
{"pk_col":10755}        {"pk_col":10755,"pg_timestamp":{"string":"2022-11-02 10:43:21.536"}}
{"pk_col":10756}        {"pk_col":10756,"pg_timestamp":{"string":"2022-11-02 10:43:21.537"}}
{"pk_col":10757}        {"pk_col":10757,"pg_timestamp":{"string":"2022-11-02 10:43:21.538"}}
{"pk_col":10758}        {"pk_col":10758,"pg_timestamp":{"string":"2022-11-02 10:43:21.538"}}
{"pk_col":10759}        {"pk_col":10759,"pg_timestamp":{"string":"2022-11-02 10:43:21.539"}}
{"pk_col":10760}        {"pk_col":10760,"pg_timestamp":{"string":"2022-11-02 10:43:21.540"}}
{"pk_col":10761}        {"pk_col":10761,"pg_timestamp":{"string":"2022-11-02 10:43:21.541"}}
{"pk_col":10762}        {"pk_col":10762,"pg_timestamp":{"string":"2022-11-02 10:43:21.542"}}
{"pk_col":10763}        {"pk_col":10763,"pg_timestamp":{"string":"2022-11-02 10:43:21.542"}}
{"pk_col":10764}        {"pk_col":10764,"pg_timestamp":{"string":"2022-11-02 10:43:21.543"}}
{"pk_col":10765}        {"pk_col":10765,"pg_timestamp":{"string":"2022-11-02 10:43:21.544"}}

CIDR conversion

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
\tat org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:326)
\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
\tat java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic sourcepg.test.test01 :
\tat io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
\tat org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
\tat org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:326)
\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
\t... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"sourcepg.test.test01\",\"fields\":[{\"name\":\"pk_col\",\"type\":{\"type\":\"long\",\"connect.default\":0},\"default\":0}],\"connect.name\":\"sourcepg.test.test01.Key\"}
\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:261)
\tat io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:168)
\tat io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
\tat io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
\t... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register schema operation failed while writing to the Kafka store; error code: 50001
\tat io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:301)
\tat io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:371)
\tat io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:548)
\tat io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:536)
\tat io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:494)
\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:275)
\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:382)
\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:355)
\tat io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:125)
\t... 17 more\

issue: numeric conversion

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
\tat java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT
\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1569)
\tat io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:149)
\tat io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:143)
\tat io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:78)
\tat io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:182)
\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:79)
\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
\t... 10 more

Reference

https://github.com/kyleconroy/pgoutput https://mykidong.medium.com/howto-kafka-connect-153dcbd53d4a

PG SOURCE JSON CONFIG

  • Create connect
    $ curl -H 'Content-Type: Application/JSON' http://172.83.1.186:8083/connectors -d @'/opt/kafka/sink.tidb.yaml' | jq 
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100  1825  100   848  100   977  31407  36185 --:--:-- --:--:-- --:--:-- 67592
    {
      "name": "SINKTiDB",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://avrotest-a56a55d7ef009651.elb.us-east-1.amazonaws.com:4000/test?stringtype=unspecified",
        "connection.user": "root",
        "connection.password": "",
        "topics": "sourcepg.test.test01",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "dialect.name": "MySqlDatabaseDialect",
        "table.name.format": "test.test01",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://172.83.1.200:8081",
        "value.converter.schema.registry.url": "http://172.83.1.200:8081",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "pk.mode": "record_key",
        "auto.create": "false",
        "auto.evolve": "false",
        "name": "SINKTiDB"
      },
      "tasks": [],
      "type": "sink"
    }
        
  • Status check
    $ curl http://172.83.1.186:8083/connectors/SINKTiDB/status | jq 
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100   166  100   166    0     0  55333      0 --:--:-- --:--:-- --:--:-- 83000
    {
      "name": "SINKTiDB",
      "connector": {
        "state": "RUNNING",
        "worker_id": "172.83.1.186:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "172.83.1.186:8083"
        }
      ],
      "type": "sink"
    }