From e1e3708bd3c6a440ea293545e63abff60ac14bb9 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Thu, 12 Dec 2024 22:59:38 +0800 Subject: [PATCH] Refactor ClusterContextManagerBuilder --- .../infra/config/mode/ModeConfigurationTest.java | 8 ++++---- .../cluster/ClusterContextManagerBuilder.java | 16 +++++++++------- .../ClusterDeliverEventSubscriberRegistry.java | 5 ++--- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/config/mode/ModeConfigurationTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/config/mode/ModeConfigurationTest.java index f936d2c5e1787..03eb1c3cb6e93 100644 --- a/infra/common/src/test/java/org/apache/shardingsphere/infra/config/mode/ModeConfigurationTest.java +++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/config/mode/ModeConfigurationTest.java @@ -26,9 +26,9 @@ class ModeConfigurationTest { @Test void assertIsCluster() { - ModeConfiguration standaloneModeConfiguration = new ModeConfiguration("Standalone", null); - assertFalse(standaloneModeConfiguration.isCluster()); - ModeConfiguration clusterModeConfiguration = new ModeConfiguration("Cluster", null); - assertTrue(clusterModeConfiguration.isCluster()); + ModeConfiguration standaloneModeConfig = new ModeConfiguration("Standalone", null); + assertFalse(standaloneModeConfig.isCluster()); + ModeConfiguration clusterModeConfig = new ModeConfiguration("Cluster", null); + assertTrue(clusterModeConfig.isCluster()); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index 461e7ac6bdca4..3723eadcf6e41 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -55,28 +55,30 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev ModeConfiguration modeConfig = param.getModeConfiguration(); ClusterPersistRepositoryConfiguration config = (ClusterPersistRepositoryConfiguration) modeConfig.getRepository(); ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(param.getInstanceMetaData(), param.getLabels()), modeConfig, eventBusContext); - ClusterPersistRepository repository = getClusterPersistRepository(config); - repository.init(config, computeNodeInstanceContext); + ClusterPersistRepository repository = getClusterPersistRepository(config, computeNodeInstanceContext); LockContext lockContext = new GlobalLockContext(new GlobalLockPersistService(repository)); computeNodeInstanceContext.init(new ClusterWorkerIdGenerator(repository, param.getInstanceMetaData().getId()), lockContext); MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository); MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, computeNodeInstanceContext); ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, repository); - registerOnline(computeNodeInstanceContext, param, result); + registerOnline(computeNodeInstanceContext, param, result, repository); return result; } - private ClusterPersistRepository getClusterPersistRepository(final ClusterPersistRepositoryConfiguration config) { + private ClusterPersistRepository getClusterPersistRepository(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) { ShardingSpherePreconditions.checkNotNull(config, MissingRequiredClusterRepositoryConfigurationException::new); - return TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps()); + ClusterPersistRepository result = TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps()); + result.init(config, computeNodeInstanceContext); + return result; } - private void registerOnline(final ComputeNodeInstanceContext computeNodeInstanceContext, final ContextManagerBuilderParameter param, final ContextManager contextManager) { + private void registerOnline(final ComputeNodeInstanceContext computeNodeInstanceContext, final ContextManagerBuilderParameter param, final ContextManager contextManager, + final ClusterPersistRepository repository) { contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance()); contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances()); new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register(); EventSubscriberRegistry eventSubscriberRegistry = new EventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext()); - eventSubscriberRegistry.register(new ClusterDeliverEventSubscriberRegistry(contextManager).getSubscribers()); + eventSubscriberRegistry.register(new ClusterDeliverEventSubscriberRegistry(repository).getSubscribers()); eventSubscriberRegistry.register(new ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers()); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java index 06ce77c27b084..490dc709411d9 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java @@ -19,7 +19,6 @@ import lombok.Getter; import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; -import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import java.util.Collection; @@ -33,7 +32,7 @@ public final class ClusterDeliverEventSubscriberRegistry { private final Collection subscribers; - public ClusterDeliverEventSubscriberRegistry(final ContextManager contextManager) { - subscribers = Collections.singleton(new DeliverQualifiedDataSourceSubscriber((ClusterPersistRepository) contextManager.getPersistServiceFacade().getRepository())); + public ClusterDeliverEventSubscriberRegistry(final ClusterPersistRepository repository) { + subscribers = Collections.singleton(new DeliverQualifiedDataSourceSubscriber(repository)); } }