Skip to content

Commit

Permalink
Add DeliverEventSubscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Dec 15, 2024
1 parent 62ef6cd commit 189cf62
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
package org.apache.shardingsphere.readwritesplitting.cluster;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import lombok.Setter;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.node.QualifiedDataSourceNode;
import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber;
import org.apache.shardingsphere.mode.event.deliver.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.spi.PersistRepository;

/**
* Readwrite-splitting qualified data source deleted subscriber.
*/
@RequiredArgsConstructor
public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber implements EventSubscriber {
@Setter
public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber implements DeliverEventSubscriber {

private final PersistRepository repository;
private PersistRepository repository;

/**
* Delete qualified data source.
Expand All @@ -41,4 +42,8 @@ public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber implem
public void delete(final QualifiedDataSourceDeletedEvent event) {
repository.delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(event.getQualifiedDataSource()));
}

@Override
public void setEventBusContext(final EventBusContext eventBusContext) {
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
#

org.apache.shardingsphere.readwritesplitting.cluster.ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory
org.apache.shardingsphere.readwritesplitting.cluster.ReadwriteSplittingQualifiedDataSourceDeletedSubscriber
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest {

@BeforeEach
void setUp() {
subscriber = new ReadwriteSplittingQualifiedDataSourceDeletedSubscriber(repository);
subscriber = new ReadwriteSplittingQualifiedDataSourceDeletedSubscriber();
subscriber.setRepository(repository);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@
* Deliver event subscriber factory.
*/
@SingletonSPI
public interface DeliverEventSubscriberFactory {
public interface DeliverEventSubscriber extends EventSubscriber {

/**
* Create deliver event subscriber.
*
* @param repository cluster persist repository
* Set persist repository.
*
* @param repository persist repository
*/
void setRepository(PersistRepository repository);

/**
* Set event bus context.
*
* @param eventBusContext event bus context
* @return created event subscriber
*/
EventSubscriber create(PersistRepository repository, EventBusContext eventBusContext);
void setEventBusContext(EventBusContext eventBusContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.cluster.event.ClusterEventSubscriberRegistry;
import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
import org.apache.shardingsphere.mode.manager.cluster.event.ClusterEventSubscriberRegistry;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.ClusterDispatchEventSubscriberRegistry;
import org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
Expand All @@ -45,7 +46,7 @@

import java.sql.SQLException;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.LinkedList;

/**
* Cluster context manager builder.
Expand Down Expand Up @@ -80,8 +81,7 @@ private void registerOnline(final ComputeNodeInstanceContext computeNodeInstance
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
ClusterEventSubscriberRegistry eventSubscriberRegistry = new ClusterEventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
eventSubscriberRegistry.register(ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriberFactory.class).stream()
.map(each -> each.create(repository, contextManager.getComputeNodeInstanceContext().getEventBusContext())).collect(Collectors.toList()));
eventSubscriberRegistry.register(createDeliverEventSubscribers(contextManager, repository));
eventSubscriberRegistry.register(new ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers());
}

Expand All @@ -91,6 +91,16 @@ private Collection<String> getDatabaseNames(final ContextManagerBuilderParameter
: metaDataPersistService.getDatabaseMetaDataFacade().getDatabase().loadAllDatabaseNames();
}

private Collection<EventSubscriber> createDeliverEventSubscribers(final ContextManager contextManager, final ClusterPersistRepository repository) {
Collection<EventSubscriber> result = new LinkedList<>();
for (DeliverEventSubscriber each : ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriber.class)) {
each.setRepository(repository);
each.setEventBusContext(contextManager.getComputeNodeInstanceContext().getEventBusContext());
result.add(each);
}
return result;
}

@Override
public String getType() {
return "Cluster";
Expand Down

0 comments on commit 189cf62

Please sign in to comment.