Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No support for nested json #19

Open
slivne opened this issue Jul 14, 2020 · 1 comment
Open

No support for nested json #19

slivne opened this issue Jul 14, 2020 · 1 comment
Labels
enhancement New feature or request

Comments

@slivne
Copy link

slivne commented Jul 14, 2020

Extracted from users slack channel

{
  "group_id": 1010,
   "competition_count": {
      "val": 20,
      "type": "ts"
   },
   "session_count": {
      "val": 20,
      "type": "ts"
    }
}
ScyllaDb Create Statement :
CREATE TYPE test.custom_type (val int, type text);
CREATE TABLE test.sample (
group_id bigint,
competition_count  FROZEN<custom_type>,
session_count  FROZEN<custom_type>,
PRIMARY KEY (group_id))
WITH default_time_to_live = 172800;

Connector by default converts the nested JSON as map and fails by throwing similar to below exception

[2020-07-08 12:51:41,111] WARN [kafka-connect-scylladb-prod-json|task-0] Exception occurred while extracting records from Kafka Sink Records, ignoring and processing next set of records. (io.connect.scylladb.ScyllaDbSinkTask:252)
org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'competition_count'
	at io.connect.scylladb.RecordConverter.convertMap(RecordConverter.java:167)
	at io.connect.scylladb.RecordConverter.findRecordTypeAndConvert(RecordConverter.java:95)
	at io.connect.scylladb.RecordConverter.convert(RecordConverter.java:82)
	at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:87)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:113)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:545)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Is there a workaround to save nested JSON into scylladb without loosing datatypes information?

@mailmahee
Copy link
Contributor

Thanks for reporting @slivne - Kafka connector should support Avro, simple JSON, or string format - not sure about nested JSON. don't think we included that in the initial requirement - I will add this as a requirement now.

@mailmahee mailmahee added the enhancement New feature or request label Jul 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants