Skip to content

Commit

Permalink
[CYB-185] indexing doesnt start when two sources map to the same topic (
Browse files Browse the repository at this point in the history
  • Loading branch information
carolynduby authored May 16, 2024
1 parent aa113af commit 46f8087
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,17 @@ private void configure(StreamTableEnvironment tableEnv) {

protected final String buildInsertSql(String topic, MappingDto mappingDto, ResolvedSchema tableSchema) {
return String.join("\n",
getInsertSqlPrefix() + " " + mappingDto.getTableName() + "(" + getInsertColumns(mappingDto) + ") "
getInsertSqlPrefix() + " " + getTableName(topic, mappingDto) + "(" + getInsertColumns(mappingDto) + ") "
+ getInsertSqlSuffix(),
" SELECT " + getFromColumns(mappingDto, tableSchema),
" from " + KAFKA_TABLE,
String.format(" where `source`='%s'", topic));
}

protected String getTableName(String source, MappingDto mappingDto) {
return mappingDto.getTableName();
}

protected String getInsertSqlPrefix() {
return "INSERT INTO ";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected void executeInsert(StreamTableEnvironment tableEnv, Map<String, Mappin
mappingDto.getTableName(), "indexing-job", params);

//read from view and write to kafka sink
final Table table = tableEnv.from(mappingDto.getTableName());
final Table table = tableEnv.from(getTableName(topic, mappingDto));
final String schemaString = AvroSchemaUtil.convertToAvro(tablesConfig.get(mappingDto.getTableName()))
.toString();

Expand Down Expand Up @@ -98,6 +98,11 @@ protected FormatDescriptor getFormatDescriptor() {
return null;
}

@Override
protected String getTableName(String source, MappingDto mappingDto) {
return source.concat("_tmpview");
}

@Override
protected String getInsertSqlPrefix() {
return "CREATE TEMPORARY VIEW ";
Expand Down

0 comments on commit 46f8087

Please sign in to comment.