Skip to content

Commit

Permalink
Implemented the functions of file source connector.
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshSawarkar committed Jan 14, 2024
1 parent 60ec82d commit 9f1c144
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,5 @@ public class SourceConnectorConfig {

private String connectorName;

private String fileName;

private String filePath;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
Expand All @@ -40,6 +41,7 @@

@Slf4j
public class FileSourceConnector implements Source {
private static final int BUFFER_SIZE = 8192;
private FileSourceConfig sourceConfig;
private String filePath;
private String fileName;
Expand All @@ -55,7 +57,7 @@ public void init(Config config) throws Exception {
// init config for hdfs source connector
this.sourceConfig = (FileSourceConfig) config;
this.filePath = ((FileSourceConfig) config).getConnectorConfig().getFilePath();
this.fileName = ((FileSourceConfig) config).getConnectorConfig().getFileName();
this.fileName = getFileName(filePath);
}

@Override
Expand All @@ -69,7 +71,8 @@ public void start() throws Exception {
if (fileName == null || fileName.isEmpty() || filePath == null || filePath.isEmpty()) {
this.bufferedReader = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8));
} else {
this.bufferedReader = Files.newBufferedReader(Paths.get(filePath, fileName), StandardCharsets.UTF_8);
this.bufferedReader = new BufferedReader(new InputStreamReader(Files.newInputStream(
Paths.get(filePath, fileName)), StandardCharsets.UTF_8), BUFFER_SIZE);
}
}

Expand Down Expand Up @@ -100,7 +103,7 @@ public List<ConnectRecord> poll() {
RecordPartition recordPartition = convertToRecordPartition(fileName);
try {
int bytesRead;
char[] buffer = new char[1024];
char[] buffer = new char[BUFFER_SIZE];
while ((bytesRead = bufferedReader.read(buffer)) != -1) {
String line = new String(buffer, 0, bytesRead);
long timeStamp = System.currentTimeMillis();
Expand All @@ -118,4 +121,9 @@ public static RecordPartition convertToRecordPartition(String fileName) {
map.put("fileName", fileName);
return new RecordPartition(map);
}

private static String getFileName(String filePath) throws NullPointerException {
File file = new File(filePath);
return file.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@ pubSubConfig:
passWord: filePassWord
connectorConfig:
connectorName: fileSource
fileName: userFileName
filePath: userFilePath

0 comments on commit 9f1c144

Please sign in to comment.