Skip to content

Commit

Permalink
Multiple fixes (#166)
Browse files Browse the repository at this point in the history
* Change parent version to bring in alternative dependencies. fixes #162. fixes #156.

* Refactored to store all references to streams and readers in the InputFile class to ensure they are closed. Cleanup policies now will ensure that file handles are closed. Fixes #163

* Added example of copying headers to field names. Fixes #149.

* Schema settings were not being sent to tasks causing the tasks to error. Fixes #157.

* CsvSchemaGenerator was not using the builder to create an instance the CSVReader. Fixes #164.

* Added check when there are blank lines in a file. Skip any blank lines. Fixes #133.
  • Loading branch information
jcustenborder authored Jan 19, 2021
1 parent 43fab15 commit 10dcc69
Show file tree
Hide file tree
Showing 12 changed files with 11,017 additions and 199 deletions.
29 changes: 15 additions & 14 deletions bin/debug.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@
: ${ERROR_PATH:='/tmp/spooldir/error'}
: ${FINISHED_PATH:='/tmp/spooldir/finished'}
: ${DEBUG_SUSPEND_FLAG:='y'}
export KAFKA_DEBUG='n'
export KAFKA_OPTS='-agentpath:/Applications/YourKit-Java-Profiler-2017.02.app/Contents/Resources/bin/mac/libyjpagent.jnilib=disablestacktelemetry,exceptions=disable,delay=10000'
export KAFKA_DEBUG='y'
export DEBUG_SUSPEND_FLAG='y'
# export KAFKA_OPTS='-agentpath:/Applications/YourKit-Java-Profiler-2017.02.app/Contents/Resources/bin/mac/libyjpagent.jnilib=disablestacktelemetry,exceptions=disable,delay=10000'
set -e

mvn clean package
# mvn clean package

#if [ ! -d "${INPUT_PATH}" ]; then
# mkdir -p "${INPUT_PATH}"
#fi
if [ ! -d "${INPUT_PATH}" ]; then
mkdir -p "${INPUT_PATH}"
fi

#if [ ! -d "${ERROR_PATH}" ]; then
# mkdir -p "${ERROR_PATH}"
#fi
if [ ! -d "${ERROR_PATH}" ]; then
mkdir -p "${ERROR_PATH}"
fi

#if [ ! -d "${FINISHED_PATH}" ]; then
# mkdir -p "${FINISHED_PATH}"
#fi
if [ ! -d "${FINISHED_PATH}" ]; then
mkdir -p "${FINISHED_PATH}"
fi

#cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv
connect-standalone config/connect-avro-docker.properties config/ELFTesting.properties
cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv"
connect-standalone config/connect-avro-docker.properties config/CSVSchemaGenerator.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.github.jcustenborder.kafka.connect.spooldir;

import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.ICSVParser;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
Expand Down Expand Up @@ -44,7 +45,8 @@ protected Map<String, Schema.Type> determineFieldTypes(InputStream inputStream)
Map<String, Schema.Type> typeMap = new LinkedHashMap<>();
ICSVParser parserBuilder = this.config.createCSVParserBuilder();
try (InputStreamReader reader = new InputStreamReader(inputStream)) {
try (CSVReader csvReader = new CSVReader(reader, 0, parserBuilder)) {
CSVReaderBuilder readerBuilder = this.config.createCSVReaderBuilder(reader, parserBuilder);
try (CSVReader csvReader = readerBuilder.build()) {
String[] headers = null;

if (this.config.firstRowAsHeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public long recordOffset() {
if (null == this.csvReader) {
result = -1L;
} else {
result = this.csvReader.getLinesRead();
result = this.csvReader.getLinesRead() - this.config.skipLines -
(this.config.firstRowAsHeader ? 1 : 0);
}
return result;
}
Expand All @@ -103,7 +104,10 @@ public List<SourceRecord> process() throws IOException {
while (records.size() < this.config.batchSize) {
String[] row = this.csvReader.readNext();

if (row == null) {
if (null == row) {
break;
}
if (row.length == 1 && null == row[0]) {
break;
}
log.trace("process() - Row on line {} has {} field(s)", recordOffset(), row.length);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
id,first_name,last_name,email,gender,ip_address,last_login,account_balance,country,favorite_color
1,Jack,Garcia,[email protected],Male,196.56.44.185,2015-09-30T15:29:03Z,347.77,IT,#4a2313
2,John,Kim,[email protected],Male,53.19.132.185,2015-11-14T10:34:09Z,251.24,CZ,#3e56cf
3,Ashley,Austin,[email protected],Female,21.164.37.9,,819.47,CN,
4,Jonathan,Mcdonald,[email protected],Male,188.172.42.140,2015-12-28T14:37:01Z,868.38,ID,#1b1414
5,Helen,Lane,[email protected],Female,159.171.138.190,2016-06-30T18:41:18Z,398.97,TN,
6,Scott,Lopez,[email protected],Male,86.194.226.35,2015-08-13T02:13:51Z,322.99,BR,
7,Christine,Franklin,[email protected],Female,248.173.207.64,2015-12-22T11:29:57Z,301.26,PH,#1d5e9d
8,Helen,Andrews,[email protected],Female,83.160.63.181,2016-03-06T11:41:10Z,217.96,CU,
9,Stephanie,Gordon,[email protected],Female,193.143.42.212,2015-10-27T22:07:24Z,495.80,CN,
10,Shirley,Andrews,[email protected],Female,99.113.183.206,2015-11-07T11:12:52Z,157.75,BR,#fc1da9
11,Joshua,Reid,[email protected],Male,197.96.118.164,2015-08-22T13:16:18Z,431.80,CO,#6e3e36
12,Frances,Parker,[email protected],Female,226.237.57.25,2015-10-18T01:50:15Z,188.21,BR,#73e909
13,Sharon,Lawson,[email protected],Female,198.189.134.106,2016-01-14T17:51:09Z,206.73,VN,
14,Elizabeth,Wells,[email protected],Female,120.108.59.206,2015-09-02T21:53:07Z,499.48,CZ,#e9c943
15,Norma,Wilson,[email protected],Female,18.246.76.220,2015-09-27T02:10:48Z,-65.19,SE,#645119
16,Joan,Watkins,[email protected],Female,240.27.33.114,2016-03-31T00:29:14Z,264.23,PH,
17,Gerald,Hamilton,[email protected],Male,182.75.62.95,2016-02-10T14:29:35Z,309.26,ID,
18,Paula,Taylor,[email protected],Female,245.74.203.0,2016-05-11T03:15:10Z,927.45,CN,
19,Carolyn,Burns,[email protected],Female,180.243.11.10,2016-02-28T18:49:23Z,752.76,NL,
20,Robin,Bennett,[email protected],Female,169.77.92.179,2016-02-15T01:06:44Z,143.30,ID,#506128




Loading

0 comments on commit 10dcc69

Please sign in to comment.