diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java index 70ffc7bc4a896..fe4d4b95f878e 100644 --- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java +++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java @@ -18,19 +18,20 @@ package org.apache.shardingsphere.readwritesplitting.cluster; import com.google.common.eventbus.Subscribe; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; +import lombok.Setter; +import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.metadata.persist.node.QualifiedDataSourceNode; +import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber; import org.apache.shardingsphere.mode.event.deliver.QualifiedDataSourceDeletedEvent; import org.apache.shardingsphere.mode.spi.PersistRepository; /** * Readwrite-splitting qualified data source deleted subscriber. */ -@RequiredArgsConstructor -public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber implements EventSubscriber { +@Setter +public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber implements DeliverEventSubscriber { - private final PersistRepository repository; + private PersistRepository repository; /** * Delete qualified data source. @@ -41,4 +42,8 @@ public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber implem public void delete(final QualifiedDataSourceDeletedEvent event) { repository.delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(event.getQualifiedDataSource())); } + + @Override + public void setEventBusContext(final EventBusContext eventBusContext) { + } } diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory.java deleted file mode 100644 index f2de2fff287fc..0000000000000 --- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.readwritesplitting.cluster; - -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; -import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory; -import org.apache.shardingsphere.mode.spi.PersistRepository; - -/** - * Readwrite-splitting qualified data source deleted subscriber factory. - */ -public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory implements DeliverEventSubscriberFactory { - - @Override - public EventSubscriber create(final PersistRepository repository, final EventBusContext eventBusContext) { - return new ReadwriteSplittingQualifiedDataSourceDeletedSubscriber(repository); - } -} diff --git a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber similarity index 94% rename from features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory rename to features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber index 3b42a3fe23eb9..01fd8f0abad49 100644 --- a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory +++ b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber @@ -15,4 +15,4 @@ # limitations under the License. # -org.apache.shardingsphere.readwritesplitting.cluster.ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory +org.apache.shardingsphere.readwritesplitting.cluster.ReadwriteSplittingQualifiedDataSourceDeletedSubscriber diff --git a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java index f3994bd0aa43e..9f25aea262fa0 100644 --- a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java +++ b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java @@ -38,7 +38,8 @@ class ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest { @BeforeEach void setUp() { - subscriber = new ReadwriteSplittingQualifiedDataSourceDeletedSubscriber(repository); + subscriber = new ReadwriteSplittingQualifiedDataSourceDeletedSubscriber(); + subscriber.setRepository(repository); } @Test diff --git a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriber.java similarity index 79% rename from mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java rename to mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriber.java index a43902000c2e6..479f027f4d999 100644 --- a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java +++ b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriber.java @@ -26,14 +26,19 @@ * Deliver event subscriber factory. */ @SingletonSPI -public interface DeliverEventSubscriberFactory { +public interface DeliverEventSubscriber extends EventSubscriber { /** - * Create deliver event subscriber. - * - * @param repository cluster persist repository + * Set persist repository. + * + * @param repository persist repository + */ + void setRepository(PersistRepository repository); + + /** + * Set event bus context. + * * @param eventBusContext event bus context - * @return created event subscriber */ - EventSubscriber create(PersistRepository repository, EventBusContext eventBusContext); + void setEventBusContext(EventBusContext eventBusContext); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index ebe7b89f7a9db..a325999b198c2 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -26,13 +26,14 @@ import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.metadata.persist.MetaDataPersistService; -import org.apache.shardingsphere.mode.manager.cluster.event.ClusterEventSubscriberRegistry; +import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber; import org.apache.shardingsphere.mode.lock.GlobalLockContext; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.ContextManagerBuilder; import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; -import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory; +import org.apache.shardingsphere.mode.manager.cluster.event.ClusterEventSubscriberRegistry; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.ClusterDispatchEventSubscriberRegistry; import org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException; @@ -45,7 +46,7 @@ import java.sql.SQLException; import java.util.Collection; -import java.util.stream.Collectors; +import java.util.LinkedList; /** * Cluster context manager builder. @@ -80,8 +81,7 @@ private void registerOnline(final ComputeNodeInstanceContext computeNodeInstance contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances()); new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register(); ClusterEventSubscriberRegistry eventSubscriberRegistry = new ClusterEventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext()); - eventSubscriberRegistry.register(ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriberFactory.class).stream() - .map(each -> each.create(repository, contextManager.getComputeNodeInstanceContext().getEventBusContext())).collect(Collectors.toList())); + eventSubscriberRegistry.register(createDeliverEventSubscribers(contextManager, repository)); eventSubscriberRegistry.register(new ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers()); } @@ -91,6 +91,16 @@ private Collection getDatabaseNames(final ContextManagerBuilderParameter : metaDataPersistService.getDatabaseMetaDataFacade().getDatabase().loadAllDatabaseNames(); } + private Collection createDeliverEventSubscribers(final ContextManager contextManager, final ClusterPersistRepository repository) { + Collection result = new LinkedList<>(); + for (DeliverEventSubscriber each : ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriber.class)) { + each.setRepository(repository); + each.setEventBusContext(contextManager.getComputeNodeInstanceContext().getEventBusContext()); + result.add(each); + } + return result; + } + @Override public String getType() { return "Cluster";