Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Could not parse <field> to 'Date' #200

Open
acamillo opened this issue Feb 23, 2022 · 4 comments
Open

Could not parse <field> to 'Date' #200

acamillo opened this issue Feb 23, 2022 · 4 comments

Comments

@acamillo
Copy link

acamillo commented Feb 23, 2022

Hi,
I am using v2.0.62 of the library with Kafka connect 6.1.0.

I am trying to parse CSV with values as:
abcd, 2022-01-10T23:09:55.000Z, abcdef, ....

Using this schema

{
  "name": "com.github.jcustenborder.kafka.connect.model.Value",
  "type": "STRUCT",
  "isOptional": false,
  "fieldSchemas": {
    "Log_Source": {
      "type": "STRING",
      "isOptional": false
    },
    "Time": {
      "name": "org.apache.kafka.connect.data.Timestamp",
      "type": "INT64",
      "version": 1,
      "isOptional": false
    },
    "ID": {
      "type": "STRING",
      "isOptional": false
    },
......
  }
}

Connector creation and configuration:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ohs-access-csv-spooldir-03/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "ohs_access_03",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "parser.timestamp.date.formats": "yyyy-MM-dd'\''T'\''HH:mm:ss.SSSXXX",
        "key.schema" : "{\n  \"name\" : \"com.github.jcustenborder.kafka.connect.model.Key\",\n  \"type\" : \"STRUCT\",\n  \"isOptional\" : false,\n  \"fieldSchemas\" : {\n    \"ID\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : false\n    }\n  }\n}",
        "value.schema" : "{\n  \"name\": \"com.github.jcustenborder.kafka.connect.model.Value\",\n  \"type\": \"STRUCT\",\n  \"isOptional\": false,\n  \"fieldSchemas\": {\n    \"Log_Source\": {\n      \"type\": \"STRING\",\n      \"isOptional\": false\n    },\n    \"Time\": {\n      \"name\": \"org.apache.kafka.connect.data.Timestamp\",\n      \"type\": \"INT64\",\n      \"version\": 1,\n      \"isOptional\": false\n    },\n    \"ID\": {\n      \"type\": \"STRING\",\n      \"isOptional\": false\n    },\n    \"Entity\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Entity_Type\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Log_Entity\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Original_Log_Content\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Message\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Parse_Failed\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Entity_GUID\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Data_Services_Load_Time\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Host_Name_Server\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Problem_Priority\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Annotation_Identifier\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Log_Index\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Solr_Cluster_CoreGroup_ID\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Label\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    }\n  }\n}"

    }'

When running I get the following exception:

[2022-02-23 14:56:24,201] ERROR [ohs-access-csv-spooldir-03|task-0] Exception encountered processing line 1 of /data/unprocessed/test.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:258)
my-local-kafka-connect    | org.apache.kafka.connect.errors.DataException: Exception thrown while parsing data for 'Time'. linenumber=1
my-local-kafka-connect    | 	at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:136)
my-local-kafka-connect    | 	at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:254)
my-local-kafka-connect    | 	at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.poll(AbstractSourceTask.java:148)
my-local-kafka-connect    | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
my-local-kafka-connect    | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
my-local-kafka-connect    | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
my-local-kafka-connect    | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
my-local-kafka-connect    | 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
my-local-kafka-connect    | 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
my-local-kafka-connect    | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
my-local-kafka-connect    | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
my-local-kafka-connect    | 	at java.base/java.lang.Thread.run(Thread.java:834)
my-local-kafka-connect    | Caused by: org.apache.kafka.connect.errors.DataException: Could not parse 'Time' to 'Date'
my-local-kafka-connect    | 	at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:113)
my-local-kafka-connect    | 	at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:128)
my-local-kafka-connect    | 	... 11 more
my-local-kafka-connect    | Caused by: java.lang.IllegalStateException: Could not parse 'Time' to java.util.Date
my-local-kafka-connect    | 	at shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:589)
my-local-kafka-connect    | 	at com.github.jcustenborder.kafka.connect.utils.data.type.BaseDateTypeParser.parseString(BaseDateTypeParser.java:55)
my-local-kafka-connect    | 	at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:109)
my-local-kafka-connect    | 	... 12 more 

Any idea what could be the cause of the issue?
Thank you.

@jcustenborder
Copy link
Owner

@acamillo Does your csv have a header? I'm wondering if it's actually trying to parse Time with that date format.

@acamillo
Copy link
Author

@jcustenborder yes it does have header. However as by recommendation in various blogs I thought to force the schema myself

@jcustenborder
Copy link
Owner

haha no please use the schema.

Try setting "csv.first.row.as.header":"true" in your config.

@acamillo
Copy link
Author

acamillo commented Feb 23, 2022

@jcustenborder
yes, that solved the problem. It was actually trying to parse the header line assuming it to be a normal data row.

Thanks for the super fast feedback.

If you don't mind taking a suggestion. The problem would have been much easier to spot (for me) if the exception output did contain the offending data row, that triggered the exception. Maybe in a future release ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants