Skip to content

Commit

Permalink
Move tableless sql route in sharding to TablelessRouteEngineFactory (a…
Browse files Browse the repository at this point in the history
…pache#33829)

* Move tableless sql route in sharding to TablelessRouteEngineFactory

* add unit test for TablelessRouteEngineFactory

* refactor ShardingRouteEngineFactory

* refactor TablelessRouteEngineFactory

* increase wait seconds to 60
  • Loading branch information
strongduanmu authored Nov 27, 2024
1 parent c42d5ec commit 6813dfe
Show file tree
Hide file tree
Showing 24 changed files with 395 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ public final class BroadcastSQLRouter implements EntranceSQLRouter<BroadcastRule

@Override
public RouteContext createRouteContext(final QueryContext queryContext, final RuleMetaData globalRuleMetaData,
final ShardingSphereDatabase database, final BroadcastRule rule, final ConfigurationProperties props) {
final ShardingSphereDatabase database, final BroadcastRule rule, final Collection<String> tableNames, final ConfigurationProperties props) {
return BroadcastRouteEngineFactory.newInstance(rule, database, queryContext).route(new RouteContext(), rule);
}

@Override
public void decorateRouteContext(final RouteContext routeContext, final QueryContext queryContext,
final ShardingSphereDatabase database, final BroadcastRule rule, final ConfigurationProperties props) {
final ShardingSphereDatabase database, final BroadcastRule rule, final Collection<String> tableNames, final ConfigurationProperties props) {
SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
if (sqlStatement instanceof TCLStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void assertCreateRouteContext() {
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
BroadcastRouteEngine routeEngine = mock(BroadcastRouteEngine.class);
when(BroadcastRouteEngineFactory.newInstance(rule, database, queryContext)).thenReturn(routeEngine);
getSQLRouter(rule).createRouteContext(queryContext, mock(RuleMetaData.class), database, rule, new ConfigurationProperties(new Properties()));
getSQLRouter(rule).createRouteContext(queryContext, mock(RuleMetaData.class), database, rule, Collections.emptyList(), new ConfigurationProperties(new Properties()));
verify(routeEngine).route(any(), eq(rule));
}

Expand All @@ -86,7 +86,7 @@ void assertDecorateBroadcastRouteContextWithSingleDataSource() {
RouteContext routeContext = new RouteContext();
routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper("foo_ds", "foo_ds"), Lists.newArrayList()));
BroadcastSQLRouter sqlRouter = getSQLRouter(rule);
sqlRouter.decorateRouteContext(routeContext, createQueryContext(), mockSingleDatabase(), rule, new ConfigurationProperties(new Properties()));
sqlRouter.decorateRouteContext(routeContext, createQueryContext(), mockSingleDatabase(), rule, Collections.emptyList(), new ConfigurationProperties(new Properties()));
Iterator<String> routedDataSourceNames = routeContext.getActualDataSourceNames().iterator();
assertThat(routedDataSourceNames.next(), is("foo_ds"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class ReadwriteSplittingSQLRouter implements DecorateSQLRouter<Read

@Override
public void decorateRouteContext(final RouteContext routeContext, final QueryContext queryContext, final ShardingSphereDatabase database,
final ReadwriteSplittingRule rule, final ConfigurationProperties props) {
final ReadwriteSplittingRule rule, final Collection<String> tableNames, final ConfigurationProperties props) {
Collection<RouteUnit> toBeRemoved = new LinkedList<>();
Collection<RouteUnit> toBeAdded = new LinkedList<>();
for (RouteUnit each : routeContext.getRouteUnits()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void assertDecorateRouteContextToPrimaryDataSource() {
RuleMetaData ruleMetaData = new RuleMetaData(Collections.singleton(staticRule));
ShardingSphereDatabase database = new ShardingSphereDatabase(DefaultDatabase.LOGIC_NAME,
mock(DatabaseType.class), mock(ResourceMetaData.class, RETURNS_DEEP_STUBS), ruleMetaData, Collections.emptyMap());
sqlRouter.decorateRouteContext(actual, queryContext, database, staticRule, new ConfigurationProperties(new Properties()));
sqlRouter.decorateRouteContext(actual, queryContext, database, staticRule, Collections.emptyList(), new ConfigurationProperties(new Properties()));
Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
assertThat(routedDataSourceNames.next(), is(NONE_READWRITE_SPLITTING_DATASOURCE_NAME));
assertThat(routedDataSourceNames.next(), is(WRITE_DATASOURCE));
Expand All @@ -111,7 +111,7 @@ void assertDecorateRouteContextToReplicaDataSource() {
ShardingSphereDatabase database = new ShardingSphereDatabase(DefaultDatabase.LOGIC_NAME,
mock(DatabaseType.class), mock(ResourceMetaData.class, RETURNS_DEEP_STUBS), ruleMetaData, Collections.emptyMap());
RouteContext actual = mockRouteContext();
sqlRouter.decorateRouteContext(actual, queryContext, database, staticRule, new ConfigurationProperties(new Properties()));
sqlRouter.decorateRouteContext(actual, queryContext, database, staticRule, Collections.emptyList(), new ConfigurationProperties(new Properties()));
assertThat(actual.getActualDataSourceNames(), is(new HashSet<>(Arrays.asList(NONE_READWRITE_SPLITTING_DATASOURCE_NAME, READ_DATASOURCE))));
}

Expand All @@ -125,7 +125,7 @@ void assertDecorateRouteContextToPrimaryDataSourceWithLock() {
RuleMetaData ruleMetaData = new RuleMetaData(Collections.singleton(staticRule));
ShardingSphereDatabase database = new ShardingSphereDatabase(DefaultDatabase.LOGIC_NAME,
mock(DatabaseType.class), mock(ResourceMetaData.class, RETURNS_DEEP_STUBS), ruleMetaData, Collections.emptyMap());
sqlRouter.decorateRouteContext(actual, queryContext, database, staticRule, new ConfigurationProperties(new Properties()));
sqlRouter.decorateRouteContext(actual, queryContext, database, staticRule, Collections.emptyList(), new ConfigurationProperties(new Properties()));
Iterator<String> routedDataSourceNames = actual.getActualDataSourceNames().iterator();
assertThat(routedDataSourceNames.next(), is(NONE_READWRITE_SPLITTING_DATASOURCE_NAME));
assertThat(routedDataSourceNames.next(), is(WRITE_DATASOURCE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class ShadowSQLRouter implements DecorateSQLRouter<ShadowRule>, Dat

@Override
public void decorateRouteContext(final RouteContext routeContext, final QueryContext queryContext, final ShardingSphereDatabase database,
final ShadowRule rule, final ConfigurationProperties props) {
final ShadowRule rule, final Collection<String> tableNames, final ConfigurationProperties props) {
Collection<RouteUnit> toBeRemovedRouteUnit = new LinkedList<>();
Collection<RouteUnit> toBeAddedRouteUnit = new LinkedList<>();
Map<String, String> shadowDataSourceMappings = ShadowDataSourceMappingsRetrieverFactory.newInstance(queryContext).retrieve(rule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.shardingsphere.sharding.rule.ShardingRule;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;

Expand All @@ -45,11 +46,12 @@ public final class CachedShardingSQLRouter {
* @param globalRuleMetaData global rule meta data
* @param database database
* @param shardingCache sharding cache
* @param tableNames table names
* @param props configuration properties
* @return route context
*/
public Optional<RouteContext> loadRouteContext(final OriginSQLRouter originSQLRouter, final QueryContext queryContext, final RuleMetaData globalRuleMetaData,
final ShardingSphereDatabase database, final ShardingCache shardingCache, final ConfigurationProperties props) {
final ShardingSphereDatabase database, final ShardingCache shardingCache, final Collection<String> tableNames, final ConfigurationProperties props) {
if (queryContext.getSql().length() > shardingCache.getConfiguration().getAllowedMaxSqlLength()) {
return Optional.empty();
}
Expand All @@ -66,7 +68,7 @@ public Optional<RouteContext> loadRouteContext(final OriginSQLRouter originSQLRo
}
Optional<RouteContext> cachedResult = shardingCache.getRouteCache().get(new ShardingRouteCacheKey(queryContext.getSql(), shardingConditionParams))
.flatMap(ShardingRouteCacheValue::getCachedRouteContext);
RouteContext result = cachedResult.orElseGet(() -> originSQLRouter.createRouteContext(queryContext, globalRuleMetaData, database, shardingCache.getShardingRule(), props));
RouteContext result = cachedResult.orElseGet(() -> originSQLRouter.createRouteContext(queryContext, globalRuleMetaData, database, shardingCache.getShardingRule(), tableNames, props));
if (!cachedResult.isPresent() && hitOneShardOnly(result)) {
shardingCache.getRouteCache().put(new ShardingRouteCacheKey(queryContext.getSql(), shardingConditionParams), new ShardingRouteCacheValue(result));
}
Expand All @@ -88,9 +90,11 @@ public interface OriginSQLRouter {
* @param globalRuleMetaData global rule meta data
* @param database database
* @param rule rule
* @param tableNames table names
* @param props configuration properties
* @return route context
*/
RouteContext createRouteContext(QueryContext queryContext, RuleMetaData globalRuleMetaData, ShardingSphereDatabase database, ShardingRule rule, ConfigurationProperties props);
RouteContext createRouteContext(QueryContext queryContext, RuleMetaData globalRuleMetaData, ShardingSphereDatabase database, ShardingRule rule, Collection<String> tableNames,
ConfigurationProperties props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.dml.DMLStatement;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand All @@ -50,27 +51,31 @@ public final class ShardingSQLRouter implements EntranceSQLRouter<ShardingRule>,

@Override
public RouteContext createRouteContext(final QueryContext queryContext, final RuleMetaData globalRuleMetaData, final ShardingSphereDatabase database,
final ShardingRule rule, final ConfigurationProperties props) {
final ShardingRule rule, final Collection<String> tableNames, final ConfigurationProperties props) {
if (rule.isShardingCacheEnabled()) {
Optional<RouteContext> result = new CachedShardingSQLRouter()
.loadRouteContext(this::createRouteContext0, queryContext, globalRuleMetaData, database, rule.getShardingCache(), props);
.loadRouteContext(this::createRouteContext0, queryContext, globalRuleMetaData, database, rule.getShardingCache(), tableNames, props);
if (result.isPresent()) {
return result.get();
}
}
return createRouteContext0(queryContext, globalRuleMetaData, database, rule, props);
return createRouteContext0(queryContext, globalRuleMetaData, database, rule, tableNames, props);
}

private RouteContext createRouteContext0(final QueryContext queryContext, final RuleMetaData globalRuleMetaData, final ShardingSphereDatabase database, final ShardingRule rule,
final ConfigurationProperties props) {
final Collection<String> tableNames, final ConfigurationProperties props) {
Collection<String> logicTableNames = rule.getShardingLogicTableNames(tableNames);
if (logicTableNames.isEmpty()) {
return new RouteContext();
}
SQLStatement sqlStatement = queryContext.getSqlStatementContext().getSqlStatement();
ShardingConditions shardingConditions = createShardingConditions(queryContext, globalRuleMetaData, database, rule);
Optional<ShardingStatementValidator> validator = ShardingStatementValidatorFactory.newInstance(sqlStatement, shardingConditions);
validator.ifPresent(optional -> optional.preValidate(rule, queryContext.getSqlStatementContext(), queryContext.getHintValueContext(), queryContext.getParameters(), database, props));
if (sqlStatement instanceof DMLStatement && shardingConditions.isNeedMerge()) {
shardingConditions.merge();
}
RouteContext result = ShardingRouteEngineFactory.newInstance(rule, database, queryContext, shardingConditions, props).route(rule);
RouteContext result = ShardingRouteEngineFactory.newInstance(rule, database, queryContext, shardingConditions, logicTableNames, props).route(rule);
validator.ifPresent(optional -> optional.postValidate(rule, queryContext.getSqlStatementContext(), queryContext.getHintValueContext(), queryContext.getParameters(), database, props, result));
return result;
}
Expand Down
Loading

0 comments on commit 6813dfe

Please sign in to comment.