Skip to content

Commit

Permalink
[ISSUE #4788] Support disruptor as memory queue (#4844)
Browse files Browse the repository at this point in the history
* [ISSUE #4788] Support disruptor as memory queue

* [ISSUE #4788] fix code style

---------

Co-authored-by: JiangShuJu <[email protected]>
  • Loading branch information
jevinjiang and JiangShuJu authored Jul 15, 2024
1 parent 015b6e9 commit 50a36aa
Show file tree
Hide file tree
Showing 14 changed files with 385 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
implementation "com.lmax:disruptor"

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.eventmesh.api.admin.AbstractAdmin;
import org.apache.eventmesh.api.admin.TopicProperties;
import org.apache.eventmesh.storage.standalone.broker.MessageQueue;
import org.apache.eventmesh.storage.standalone.broker.Channel;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;

Expand All @@ -42,11 +42,11 @@ public StandaloneAdmin() {

@Override
public List<TopicProperties> getTopic() throws Exception {
ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = this.standaloneBroker.getMessageContainer();
ConcurrentHashMap<TopicMetadata, Channel> messageContainer = this.standaloneBroker.getMessageContainer();
List<TopicProperties> topicList = new ArrayList<>();
messageContainer.keySet().forEach(topicMetadata -> {
MessageQueue messageQueue = messageContainer.get(topicMetadata);
final int messageCount = messageQueue.getPutIndex() - messageQueue.getTakeIndex();
Channel channel = messageContainer.get(topicMetadata);
final int messageCount = channel.getMessageCount();
topicList.add(new TopicProperties(
topicMetadata.getTopicName(),
messageCount));
Expand All @@ -65,25 +65,7 @@ public void deleteTopic(String topicName) {
standaloneBroker.deleteTopicIfExist(topicName);
}

@Override
public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
if (!this.standaloneBroker.checkTopicExist(topicName)) {
throw new Exception("The topic name doesn't exist in the message queue");
}
ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = this.standaloneBroker.getMessageContainer();
long topicOffset = messageContainer.get(new TopicMetadata(topicName)).getTakeIndex();

List<CloudEvent> messageList = new ArrayList<>();
for (int index = 0; index < length; index++) {
long messageOffset = topicOffset + offset + index;
CloudEvent event = this.standaloneBroker.getMessage(topicName, messageOffset);
if (event == null) {
break;
}
messageList.add(event);
}
return messageList;
}

@Override
public void publish(CloudEvent cloudEvent) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.standalone.broker;

import org.apache.eventmesh.api.LifeCycle;
import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
import org.apache.eventmesh.storage.standalone.broker.provider.DisruptorProvider;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import lombok.Getter;


public class Channel implements LifeCycle {

public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
@Getter
private DisruptorProvider provider;
private final Integer size;
private final EventHandler<MessageEntity> eventHandler;
private volatile boolean started = false;
private final TopicMetadata topic;
private static final String THREAD_NAME_PREFIX = "standalone_disruptor_provider_";

public Channel(TopicMetadata topic, EventHandler<MessageEntity> eventHandler) {
this(DEFAULT_SIZE, topic, eventHandler);
}


public Channel(final Integer ringBufferSize, final TopicMetadata topic, final EventHandler<MessageEntity> eventHandler) {
this.size = ringBufferSize;
this.topic = topic;
this.eventHandler = eventHandler;
}


@Override
public boolean isStarted() {
return started;
}

@Override
public boolean isClosed() {
return !isStarted();
}

public synchronized void start() {
if (isClosed()) {
doStart();
started = true;
}
}

public void doStart() {
Disruptor<MessageEntity> disruptor = new Disruptor<>(
MessageEntity::new,
size,
new EventMeshThreadFactory(THREAD_NAME_PREFIX + topic.getTopicName(), true),
ProducerType.MULTI,
new BlockingWaitStrategy()
);

disruptor.handleEventsWith(eventHandler);
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
RingBuffer<MessageEntity> ringBuffer = disruptor.getRingBuffer();
provider = new DisruptorProvider(ringBuffer, disruptor);
provider.start();
}

public int getMessageCount() {
return provider.getMessageCount();
}

@Override
public synchronized void shutdown() {
if (isStarted()) {
provider.shutdown();
provider = null;
started = false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,75 @@

import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
import org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClear;
import org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClearTask;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import io.cloudevents.CloudEvent;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* This broker used to store event, it just support standalone mode, you shouldn't use this module in production environment
*/
@Slf4j
public class StandaloneBroker {

private final ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer;
// message source by topic
@Getter
private final ConcurrentHashMap<TopicMetadata, Channel> messageContainer;

// todo: move the offset manage to consumer
private final ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap;
@Getter
private final ConcurrentHashMap<TopicMetadata, Subscribe> subscribeContainer;

private StandaloneBroker() {
this.messageContainer = new ConcurrentHashMap<>();
this.offsetMap = new ConcurrentHashMap<>();
startHistoryMessageCleanTask();
}

public ConcurrentHashMap<TopicMetadata, MessageQueue> getMessageContainer() {
return this.messageContainer;
this.subscribeContainer = new ConcurrentHashMap<>();
}

public ConcurrentHashMap<TopicMetadata, AtomicLong> getOffsetMap() {
return this.offsetMap;
}

public static StandaloneBroker getInstance() {
return StandaloneBrokerInstanceHolder.instance;
return StandaloneBrokerInstanceHolder.INSTANCE;
}

/**
* put message
*
* @param topicName topic name
* @param message message
* @throws InterruptedException
*/
public MessageEntity putMessage(String topicName, CloudEvent message) throws InterruptedException {
Pair<MessageQueue, AtomicLong> pair = createTopicIfAbsent(topicName);
AtomicLong topicOffset = pair.getRight();
MessageQueue messageQueue = pair.getLeft();

MessageEntity messageEntity = new MessageEntity(
new TopicMetadata(topicName), message, topicOffset.getAndIncrement(), System.currentTimeMillis());
messageQueue.put(messageEntity);

public MessageEntity putMessage(String topicName, CloudEvent message) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
if (!messageContainer.containsKey(topicMetadata)) {
createTopic(topicName);
}
Channel channel = messageContainer.get(topicMetadata);
MessageEntity messageEntity = new MessageEntity(new TopicMetadata(topicName), message);
channel.getProvider().onData(messageEntity);
return messageEntity;
}

public Channel createTopic(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
return messageContainer.computeIfAbsent(topicMetadata, k -> {
Subscribe subscribe = subscribeContainer.get(topicMetadata);
if (subscribe == null) {
throw new IllegalStateException("the topic not exist subscribe ");
}
Channel channel = new Channel(topicMetadata, subscribe);
channel.start();
return channel;
});
}

/**
* Get the message, if the queue is empty then await
*
* @param topicName
*/
public CloudEvent takeMessage(String topicName) throws InterruptedException {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
return messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).take().getMessage();
return null;
}

/**
Expand All @@ -92,12 +96,7 @@ public CloudEvent takeMessage(String topicName) throws InterruptedException {
* @param topicName
*/
public CloudEvent getMessage(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageEntity head = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getHead();
if (head == null) {
return null;
}
return head.getMessage();
return null;
}

/**
Expand All @@ -108,21 +107,9 @@ public CloudEvent getMessage(String topicName) {
* @return CloudEvent
*/
public CloudEvent getMessage(String topicName, long offset) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageEntity messageEntity = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getByOffset(offset);
if (messageEntity == null) {
return null;
}
return messageEntity.getMessage();
return null;
}

private void startHistoryMessageCleanTask() {
HistoryMessageClear historyMessageClear = new HistoryMessageClear(messageContainer);
Thread thread = new Thread(new HistoryMessageClearTask(historyMessageClear));
thread.setDaemon(true);
thread.setName("StandaloneBroker-HistoryMessageCleanTask");
thread.start();
}

public boolean checkTopicExist(String topicName) {
return messageContainer.containsKey(new TopicMetadata(topicName));
Expand All @@ -132,13 +119,10 @@ public boolean checkTopicExist(String topicName) {
* if the topic does not exist, create the topic
*
* @param topicName topicName
* @return messageQueue and offset
* @return Channel
*/
public Pair<MessageQueue, AtomicLong> createTopicIfAbsent(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageQueue messageQueue = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue());
AtomicLong offset = offsetMap.computeIfAbsent(topicMetadata, k -> new AtomicLong());
return Pair.of(messageQueue, offset);
public Channel createTopicIfAbsent(String topicName) {
return createTopic(topicName);
}

/**
Expand All @@ -148,18 +132,23 @@ public Pair<MessageQueue, AtomicLong> createTopicIfAbsent(String topicName) {
*/
public void deleteTopicIfExist(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
Channel channel = createTopicIfAbsent(topicName);
channel.shutdown();
messageContainer.remove(topicMetadata);
}

public void updateOffset(TopicMetadata topicMetadata, long offset) {
offsetMap.computeIfPresent(topicMetadata, (k, v) -> {
v.set(offset);
return v;
});
public void subscribed(String topicName, Subscribe subscribe) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
if (getMessageContainer().containsKey(topicMetadata)) {
log.warn("the topic already subscribed");
return;
}
subscribeContainer.put(topicMetadata, subscribe);
}


private static class StandaloneBrokerInstanceHolder {

private static final StandaloneBroker instance = new StandaloneBroker();
private static final StandaloneBroker INSTANCE = new StandaloneBroker();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import io.cloudevents.CloudEvent;

import lombok.NoArgsConstructor;

@NoArgsConstructor
public class MessageEntity implements Serializable {

private static final long serialVersionUID = 6646148767540524786L;
Expand All @@ -40,6 +43,11 @@ public MessageEntity(TopicMetadata topicMetadata, CloudEvent message, long offse
this.createTimeMills = currentTimeMills;
}

public MessageEntity(TopicMetadata topicMetadata, CloudEvent message) {
this.topicMetadata = topicMetadata;
this.message = message;
}

public TopicMetadata getTopicMetadata() {
return topicMetadata;
}
Expand Down
Loading

0 comments on commit 50a36aa

Please sign in to comment.