Fails when topic name and scylla table names are different in config topic.my_topic.my_ks.my_table.mapping
#26
Labels
enhancement
New feature or request
The connector fails with the below exception when mapping is provided with different topic and table names. It doesn't pick the table name from the mapping config
topic.my_topic.my_ks.my_table.mapping
. Whereas datastax Cassandra-sink connector seems to handle this seamlessly for cassandra db.[2020-10-07 10:13:16,763] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg.test-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask) org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records. at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:250) at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:153) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at io.connect.scylladb.ScyllaDbSessionImpl.createInsertPreparedStatement(ScyllaDbSessionImpl.java:147) at io.connect.scylladb.ScyllaDbSessionImpl.access$300(ScyllaDbSessionImpl.java:33) at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:171) at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:168) at java.util.HashMap.computeIfAbsent(HashMap.java:1127) at io.connect.scylladb.ScyllaDbSessionImpl.insert(ScyllaDbSessionImpl.java:166) at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:86) at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:113) ... 11 more
The text was updated successfully, but these errors were encountered: