Skip to content

Commit

Permalink
Refactor StandardPipelineTableMetaDataLoader (apache#33980)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 9, 2024
1 parent 4bb8ecc commit 7a897ad
Showing 1 changed file with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,24 @@ public PipelineTableMetaData getTableMetaData(final String schemaName, final Str
return result;
}

private void loadTableMetaData(final String schemaName, final String tableNamePattern) throws SQLException {
private void loadTableMetaData(final String schemaName, final String tableName) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSource.getDatabaseType()).getDialectDatabaseMetaData();
Map<ShardingSphereIdentifier, PipelineTableMetaData> tableMetaDataMap = loadTableMetaData0(connection, dialectDatabaseMetaData.isSchemaAvailable() ? schemaName : null, tableNamePattern);
this.tableMetaDataMap.putAll(tableMetaDataMap);
tableMetaDataMap.putAll(loadTableMetaData(connection, dialectDatabaseMetaData.isSchemaAvailable() ? schemaName : null, tableName));
}
}

private Map<ShardingSphereIdentifier, PipelineTableMetaData> loadTableMetaData0(final Connection connection, final String schemaName, final String tableNamePattern) throws SQLException {
private Map<ShardingSphereIdentifier, PipelineTableMetaData> loadTableMetaData(final Connection connection, final String schemaName, final String tableNamePattern) throws SQLException {
Collection<String> tableNames = new LinkedList<>();
try (ResultSet resultSet = connection.getMetaData().getTables(connection.getCatalog(), schemaName, tableNamePattern, null)) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
tableNames.add(tableName);
tableNames.add(resultSet.getString("TABLE_NAME"));
}
}
Map<ShardingSphereIdentifier, PipelineTableMetaData> result = new LinkedHashMap<>(tableNames.size(), 1F);
for (String each : tableNames) {
Collection<ShardingSphereIdentifier> primaryKeys = loadPrimaryKeys(connection, schemaName, each);
Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> uniqueKeys = loadUniqueIndexesOfTable(connection, schemaName, each);
Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> uniqueKeys = loadUniqueKeys(connection, schemaName, each);
Map<ShardingSphereIdentifier, PipelineColumnMetaData> columnMetaDataMap = new LinkedHashMap<>();
try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), schemaName, each, "%")) {
while (resultSet.next()) {
Expand All @@ -104,8 +102,7 @@ private Map<ShardingSphereIdentifier, PipelineTableMetaData> loadTableMetaData0(
boolean primaryKey = primaryKeys.contains(columnName);
boolean isNullable = "YES".equals(resultSet.getString("IS_NULLABLE"));
boolean isUniqueKey = uniqueKeys.values().stream().anyMatch(names -> names.contains(columnName));
PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(ordinalPosition, columnName.toString(), dataType, dataTypeName, isNullable, primaryKey, isUniqueKey);
columnMetaDataMap.put(columnName, columnMetaData);
columnMetaDataMap.put(columnName, new PipelineColumnMetaData(ordinalPosition, columnName.toString(), dataType, dataTypeName, isNullable, primaryKey, isUniqueKey));
}
}
Collection<PipelineIndexMetaData> uniqueIndexMetaData = uniqueKeys.entrySet().stream()
Expand All @@ -117,8 +114,18 @@ private Map<ShardingSphereIdentifier, PipelineTableMetaData> loadTableMetaData0(
return result;
}

private Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> loadUniqueIndexesOfTable(final Connection connection,
final String schemaName, final String tableName) throws SQLException {
private Collection<ShardingSphereIdentifier> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
SortedMap<Short, ShardingSphereIdentifier> result = new TreeMap<>();
try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), schemaName, tableName)) {
while (resultSet.next()) {
result.put(resultSet.getShort("KEY_SEQ"), new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
}
}
return result.values();
}

private Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> loadUniqueKeys(final Connection connection,
final String schemaName, final String tableName) throws SQLException {
Map<String, SortedMap<Short, ShardingSphereIdentifier>> orderedColumnsOfIndexes = new LinkedHashMap<>();
// Set approximate=true to avoid Oracle driver 19 run `analyze table`
try (ResultSet resultSet = connection.getMetaData().getIndexInfo(connection.getCatalog(), schemaName, tableName, true, true)) {
Expand All @@ -127,8 +134,8 @@ private Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> load
if (null == indexName) {
continue;
}
orderedColumnsOfIndexes.computeIfAbsent(indexName, unused -> new TreeMap<>()).put(
resultSet.getShort("ORDINAL_POSITION"), new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
orderedColumnsOfIndexes.computeIfAbsent(indexName,
unused -> new TreeMap<>()).put(resultSet.getShort("ORDINAL_POSITION"), new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
}
}
Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> result = new LinkedHashMap<>();
Expand All @@ -138,14 +145,4 @@ private Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> load
}
return result;
}

private Collection<ShardingSphereIdentifier> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
SortedMap<Short, ShardingSphereIdentifier> result = new TreeMap<>();
try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), schemaName, tableName)) {
while (resultSet.next()) {
result.put(resultSet.getShort("KEY_SEQ"), new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
}
}
return result.values();
}
}

0 comments on commit 7a897ad

Please sign in to comment.