Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

RSocket Broker CloudEvents

linux_china edited this page Feb 6, 2021 · 8 revisions

事件广播是RSocket Broker设计中一个非常重要的特性,目前主要包括以下内容:

  • 应用广播: 对所有接入到RSocket Broker的应用进行事件广播
  • 集群内部广播: RSocket Broker之间的事件广播
  • 基于CloudEvents的JSON数据格式: 基于CloudEvents规范,数据格式为JSON
  • 通过RSocket协议的metadataPush进行通讯: metadataPush是RSocket协议事件推送标准接口

应用广播

应用广播是Broker多所有接入的应用进行事件推送,包含以下几种模式:

  • 一对一定点推送: 给特定的应用实例发送事件,如在做变更测试等
  • 一对广播: 主要是通知某一应用的所有实例,如某一应用配置发生变化,你可以使用该接口进行配置推送
  • 全体广播: 给接入的所有应用实例发送事件广播,如broker集群变更等。

基于是应用广播的核心场景主要包括:

  • 配置推送: 通过Broker完成应用的配置信息推送
  • 集群变更: 集群发生变更时,会给所有接入的应用发送通知,应用会基于新的集群拓扑结构进行重连,保证服务的发布和服务调用的顺利
  • 应用上下线: 应用侧也可以发送事件给broker集群,保证应用的优雅上下线
  • 自定义事件通知: 根据你的实际业务场景进行自定义事件推送

对应的Java接口如下:

public interface BroadcastSpread {

    Mono<Void> send(String appUUID, final CloudEventImpl cloudEvent);

    Mono<Void> broadcast(String appName, final CloudEventImpl cloudEvent);

    Mono<Void> broadcastAll(CloudEventImpl cloudEvent);

    default CloudEventImpl<Map<String, Object>> buildMapCloudEvent(@NotNull String type, @NotNull String subject, @NotNull Map<String, Object> data) {
        return CloudEventBuilder.builder(data)
                .withSource(BrokerAppContext.identity())
                .withSubject(subject)
                .build();
    }
}

考虑到方便应用接入端的统一处理,目前CloudEvents只支持JSON格式,你可以使用基于Map的CloudEvent或者你自己构建的事件对象,这个都是支持的。

应用端如何接入事件消费

所有的CloudEvents事件都由reactiveCloudEventProcessor和CloudEventsProcessor这两个Spring Bean管理,你只需要创建一个CloudEventsConsumer,然后基于事件类型进行逻辑处理即可。 如处理Cache失效Event的样例代码如下:

@Component
public class InvalidCacheEventConsumer implements CloudEventsConsumer {
    @Autowired(required = false)
    private CacheManager cacheManager;

    @Override
    public boolean shouldAccept(CloudEventImpl<?> cloudEvent) {
        String type = cloudEvent.getAttributes().getType();
        return cacheManager != null && InvalidCacheEvent.class.getCanonicalName().equalsIgnoreCase(type);
    }

    @Override
    public Mono<Void> accept(CloudEventImpl<?> cloudEvent) {
        return Mono.fromRunnable(() -> {
           // logic here
        });
    }
}

此外你还可以基于Spring标准的EventListener来处理CloudEvent,样例代码如下:

    @EventListener
    public Mono<Void> handleCloudEvent(final CloudEventImpl<?> event) {
         return Mono.fromRunnable(() -> {
           System.out.println("Event: " + event.getCloudEvent().getId());
        });
    }

RSocket Broker集群广播

目前RSocket Broker采用gossip进行集群内消息广播,如统一设置过滤器规则、调整应用权重等,你可以根据实际的业务需求进行扩展。

interface RSocketBrokerManager  {
    Mono<String> broadcast(CloudEventImpl<?> cloudEvent);
}

注意: RSocket Broker集群内的服务器上下线,这个不在广播范畴内。

RSocket Gossip Broker集群内部的RPC

在某些特殊的情况下,一个broker可能需要访问另外一个broker获取相关的信息,如broke的配置,当前metrics等等, 这个时候我们可以使用 "cluster.requestResponse()" 发起对某一broker实例的调用。考虑到调用的简洁性,我们采用json-rpc规范实现broker到broker的调用。 相信请参考Broker Cluster的gossip实现。

References

RSocket

Network Protocol

  • Binary: byte stream
  • Async message
  • Multi transports
  • Reactive Semantics

Symmetric interactions

  • request/response
  • request/stream
  • fire-and-forget
  • channel

Transports

  • TCP+TLS
  • WebSocket+TLS
  • UDP(Aeron)
  • RDMA

Polyglot

Clone this wiki locally