diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle b/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle index a8b1827aa7..22271fb57d 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle @@ -18,6 +18,7 @@ dependencies { implementation project(":eventmesh-common") implementation project(":eventmesh-storage-plugin:eventmesh-storage-api") + implementation "com.lmax:disruptor" compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java index 7f5ab2da67..72257647ad 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java @@ -19,7 +19,7 @@ import org.apache.eventmesh.api.admin.AbstractAdmin; import org.apache.eventmesh.api.admin.TopicProperties; -import org.apache.eventmesh.storage.standalone.broker.MessageQueue; +import org.apache.eventmesh.storage.standalone.broker.Channel; import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker; import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata; @@ -42,11 +42,11 @@ public StandaloneAdmin() { @Override public List getTopic() throws Exception { - ConcurrentHashMap messageContainer = this.standaloneBroker.getMessageContainer(); + ConcurrentHashMap messageContainer = this.standaloneBroker.getMessageContainer(); List topicList = new ArrayList<>(); messageContainer.keySet().forEach(topicMetadata -> { - MessageQueue messageQueue = messageContainer.get(topicMetadata); - final int messageCount = messageQueue.getPutIndex() - messageQueue.getTakeIndex(); + Channel channel = messageContainer.get(topicMetadata); + final int messageCount = channel.getMessageCount(); topicList.add(new TopicProperties( topicMetadata.getTopicName(), messageCount)); @@ -65,25 +65,7 @@ public void deleteTopic(String topicName) { standaloneBroker.deleteTopicIfExist(topicName); } - @Override - public List getEvent(String topicName, int offset, int length) throws Exception { - if (!this.standaloneBroker.checkTopicExist(topicName)) { - throw new Exception("The topic name doesn't exist in the message queue"); - } - ConcurrentHashMap messageContainer = this.standaloneBroker.getMessageContainer(); - long topicOffset = messageContainer.get(new TopicMetadata(topicName)).getTakeIndex(); - List messageList = new ArrayList<>(); - for (int index = 0; index < length; index++) { - long messageOffset = topicOffset + offset + index; - CloudEvent event = this.standaloneBroker.getMessage(topicName, messageOffset); - if (event == null) { - break; - } - messageList.add(event); - } - return messageList; - } @Override public void publish(CloudEvent cloudEvent) throws Exception { diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java new file mode 100644 index 0000000000..2ea7310b83 --- /dev/null +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.storage.standalone.broker; + +import org.apache.eventmesh.api.LifeCycle; +import org.apache.eventmesh.common.EventMeshThreadFactory; +import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity; +import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata; +import org.apache.eventmesh.storage.standalone.broker.provider.DisruptorProvider; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.IgnoreExceptionHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + +import lombok.Getter; + + +public class Channel implements LifeCycle { + + public static final Integer DEFAULT_SIZE = 4096 << 1 << 1; + @Getter + private DisruptorProvider provider; + private final Integer size; + private final EventHandler eventHandler; + private volatile boolean started = false; + private final TopicMetadata topic; + private static final String THREAD_NAME_PREFIX = "standalone_disruptor_provider_"; + + public Channel(TopicMetadata topic, EventHandler eventHandler) { + this(DEFAULT_SIZE, topic, eventHandler); + } + + + public Channel(final Integer ringBufferSize, final TopicMetadata topic, final EventHandler eventHandler) { + this.size = ringBufferSize; + this.topic = topic; + this.eventHandler = eventHandler; + } + + + @Override + public boolean isStarted() { + return started; + } + + @Override + public boolean isClosed() { + return !isStarted(); + } + + public synchronized void start() { + if (isClosed()) { + doStart(); + started = true; + } + } + + public void doStart() { + Disruptor disruptor = new Disruptor<>( + MessageEntity::new, + size, + new EventMeshThreadFactory(THREAD_NAME_PREFIX + topic.getTopicName(), true), + ProducerType.MULTI, + new BlockingWaitStrategy() + ); + + disruptor.handleEventsWith(eventHandler); + disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); + RingBuffer ringBuffer = disruptor.getRingBuffer(); + provider = new DisruptorProvider(ringBuffer, disruptor); + provider.start(); + } + + public int getMessageCount() { + return provider.getMessageCount(); + } + + @Override + public synchronized void shutdown() { + if (isStarted()) { + provider.shutdown(); + provider = null; + started = false; + } + } + +} \ No newline at end of file diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java index 5e64b40a70..8654b2d1c3 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java @@ -19,42 +19,36 @@ import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity; import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata; -import org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClear; -import org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClearTask; - -import org.apache.commons.lang3.tuple.Pair; +import org.apache.eventmesh.storage.standalone.broker.task.Subscribe; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import io.cloudevents.CloudEvent; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + /** * This broker used to store event, it just support standalone mode, you shouldn't use this module in production environment */ +@Slf4j public class StandaloneBroker { - private final ConcurrentHashMap messageContainer; + // message source by topic + @Getter + private final ConcurrentHashMap messageContainer; - // todo: move the offset manage to consumer - private final ConcurrentHashMap offsetMap; + @Getter + private final ConcurrentHashMap subscribeContainer; private StandaloneBroker() { this.messageContainer = new ConcurrentHashMap<>(); - this.offsetMap = new ConcurrentHashMap<>(); - startHistoryMessageCleanTask(); - } - - public ConcurrentHashMap getMessageContainer() { - return this.messageContainer; + this.subscribeContainer = new ConcurrentHashMap<>(); } - public ConcurrentHashMap getOffsetMap() { - return this.offsetMap; - } public static StandaloneBroker getInstance() { - return StandaloneBrokerInstanceHolder.instance; + return StandaloneBrokerInstanceHolder.INSTANCE; } /** @@ -62,28 +56,38 @@ public static StandaloneBroker getInstance() { * * @param topicName topic name * @param message message - * @throws InterruptedException */ - public MessageEntity putMessage(String topicName, CloudEvent message) throws InterruptedException { - Pair pair = createTopicIfAbsent(topicName); - AtomicLong topicOffset = pair.getRight(); - MessageQueue messageQueue = pair.getLeft(); - - MessageEntity messageEntity = new MessageEntity( - new TopicMetadata(topicName), message, topicOffset.getAndIncrement(), System.currentTimeMillis()); - messageQueue.put(messageEntity); - + public MessageEntity putMessage(String topicName, CloudEvent message) { + TopicMetadata topicMetadata = new TopicMetadata(topicName); + if (!messageContainer.containsKey(topicMetadata)) { + createTopic(topicName); + } + Channel channel = messageContainer.get(topicMetadata); + MessageEntity messageEntity = new MessageEntity(new TopicMetadata(topicName), message); + channel.getProvider().onData(messageEntity); return messageEntity; } + public Channel createTopic(String topicName) { + TopicMetadata topicMetadata = new TopicMetadata(topicName); + return messageContainer.computeIfAbsent(topicMetadata, k -> { + Subscribe subscribe = subscribeContainer.get(topicMetadata); + if (subscribe == null) { + throw new IllegalStateException("the topic not exist subscribe "); + } + Channel channel = new Channel(topicMetadata, subscribe); + channel.start(); + return channel; + }); + } + /** * Get the message, if the queue is empty then await * * @param topicName */ public CloudEvent takeMessage(String topicName) throws InterruptedException { - TopicMetadata topicMetadata = new TopicMetadata(topicName); - return messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).take().getMessage(); + return null; } /** @@ -92,12 +96,7 @@ public CloudEvent takeMessage(String topicName) throws InterruptedException { * @param topicName */ public CloudEvent getMessage(String topicName) { - TopicMetadata topicMetadata = new TopicMetadata(topicName); - MessageEntity head = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getHead(); - if (head == null) { - return null; - } - return head.getMessage(); + return null; } /** @@ -108,21 +107,9 @@ public CloudEvent getMessage(String topicName) { * @return CloudEvent */ public CloudEvent getMessage(String topicName, long offset) { - TopicMetadata topicMetadata = new TopicMetadata(topicName); - MessageEntity messageEntity = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getByOffset(offset); - if (messageEntity == null) { - return null; - } - return messageEntity.getMessage(); + return null; } - private void startHistoryMessageCleanTask() { - HistoryMessageClear historyMessageClear = new HistoryMessageClear(messageContainer); - Thread thread = new Thread(new HistoryMessageClearTask(historyMessageClear)); - thread.setDaemon(true); - thread.setName("StandaloneBroker-HistoryMessageCleanTask"); - thread.start(); - } public boolean checkTopicExist(String topicName) { return messageContainer.containsKey(new TopicMetadata(topicName)); @@ -132,13 +119,10 @@ public boolean checkTopicExist(String topicName) { * if the topic does not exist, create the topic * * @param topicName topicName - * @return messageQueue and offset + * @return Channel */ - public Pair createTopicIfAbsent(String topicName) { - TopicMetadata topicMetadata = new TopicMetadata(topicName); - MessageQueue messageQueue = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()); - AtomicLong offset = offsetMap.computeIfAbsent(topicMetadata, k -> new AtomicLong()); - return Pair.of(messageQueue, offset); + public Channel createTopicIfAbsent(String topicName) { + return createTopic(topicName); } /** @@ -148,18 +132,23 @@ public Pair createTopicIfAbsent(String topicName) { */ public void deleteTopicIfExist(String topicName) { TopicMetadata topicMetadata = new TopicMetadata(topicName); + Channel channel = createTopicIfAbsent(topicName); + channel.shutdown(); messageContainer.remove(topicMetadata); } - public void updateOffset(TopicMetadata topicMetadata, long offset) { - offsetMap.computeIfPresent(topicMetadata, (k, v) -> { - v.set(offset); - return v; - }); + public void subscribed(String topicName, Subscribe subscribe) { + TopicMetadata topicMetadata = new TopicMetadata(topicName); + if (getMessageContainer().containsKey(topicMetadata)) { + log.warn("the topic already subscribed"); + return; + } + subscribeContainer.put(topicMetadata, subscribe); } + private static class StandaloneBrokerInstanceHolder { - private static final StandaloneBroker instance = new StandaloneBroker(); + private static final StandaloneBroker INSTANCE = new StandaloneBroker(); } -} +} \ No newline at end of file diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java index 0f437aee04..3662b30255 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java @@ -21,6 +21,9 @@ import io.cloudevents.CloudEvent; +import lombok.NoArgsConstructor; + +@NoArgsConstructor public class MessageEntity implements Serializable { private static final long serialVersionUID = 6646148767540524786L; @@ -40,6 +43,11 @@ public MessageEntity(TopicMetadata topicMetadata, CloudEvent message, long offse this.createTimeMills = currentTimeMills; } + public MessageEntity(TopicMetadata topicMetadata, CloudEvent message) { + this.topicMetadata = topicMetadata; + this.message = message; + } + public TopicMetadata getTopicMetadata() { return topicMetadata; } diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/provider/DisruptorProvider.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/provider/DisruptorProvider.java new file mode 100644 index 0000000000..47b2665a2c --- /dev/null +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/provider/DisruptorProvider.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.storage.standalone.broker.provider; + +import org.apache.eventmesh.api.LifeCycle; +import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity; + +import com.lmax.disruptor.EventTranslatorOneArg; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; + +import lombok.extern.slf4j.Slf4j; + +/** + * DisruptorProvider. disruptor provider definition. + */ +@Slf4j +public class DisruptorProvider implements LifeCycle { + + private final RingBuffer ringBuffer; + + private final Disruptor disruptor; + + private volatile boolean start = false; + + private final EventTranslatorOneArg translatorOneArg = (messageEntity, sequence, arg0) -> { + arg0.setOffset(sequence); + arg0.setCreateTimeMills(System.currentTimeMillis()); + messageEntity.setOffset(arg0.getOffset()); + messageEntity.setCreateTimeMills(arg0.getCreateTimeMills()); + messageEntity.setTopicMetadata(arg0.getTopicMetadata()); + messageEntity.setMessage(arg0.getMessage()); + }; + + + /** + * Instantiates a new Disruptor provider. + * + * @param ringBuffer the ring buffer + * @param disruptor the disruptor + */ + public DisruptorProvider(final RingBuffer ringBuffer, final Disruptor disruptor) { + this.ringBuffer = ringBuffer; + this.disruptor = disruptor; + } + + /** + * @param data the data + */ + public MessageEntity onData(final MessageEntity data) { + if (isClosed()) { + throw new IllegalArgumentException("the disruptor is close"); + } + try { + ringBuffer.publishEvent(translatorOneArg, data); + } catch (Exception ex) { + throw new IllegalStateException("send data fail."); + } + return data; + } + + + @Override + public boolean isStarted() { + return start; + } + + @Override + public boolean isClosed() { + return !isStarted(); + } + + @Override + public void start() { + if (null != disruptor) { + disruptor.start(); + start = true; + } + } + + /** + * Shutdown. + */ + public void shutdown() { + if (null != disruptor) { + disruptor.shutdown(); + start = false; + } + } + + public int getMessageCount() { + return ringBuffer.getBufferSize(); + } +} \ No newline at end of file diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java index 8316270adf..4c84849ac7 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java @@ -21,16 +21,19 @@ import org.apache.eventmesh.api.EventMeshAction; import org.apache.eventmesh.api.EventMeshAsyncConsumeContext; import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker; +import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity; -import java.util.concurrent.atomic.AtomicInteger; import io.cloudevents.CloudEvent; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.WorkHandler; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j -public class Subscribe { +public class Subscribe implements WorkHandler, EventHandler { @Getter private final String topicName; @@ -38,8 +41,6 @@ public class Subscribe { private final EventListener listener; @Getter private volatile boolean isRunning; - @Getter - private AtomicInteger offset; public Subscribe(String topicName, StandaloneBroker standaloneBroker, @@ -51,52 +52,50 @@ public Subscribe(String topicName, } public void subscribe() { + standaloneBroker.subscribed(topicName, this); + } + + public void shutdown() { + isRunning = false; + standaloneBroker.deleteTopicIfExist(topicName); + } + + @Override + public void onEvent(MessageEntity event, long sequence, boolean endOfBatch) { + onEvent(event); + } + + @Override + public void onEvent(MessageEntity event) { try { - log.debug("execute subscribe task, topic: {}, offset: {}", topicName, offset); - if (offset == null) { - CloudEvent message = standaloneBroker.getMessage(topicName); - if (message != null) { - Object tmpOffset = message.getExtension("offset"); - if (tmpOffset instanceof Integer) { - offset = new AtomicInteger(Integer.parseInt(tmpOffset.toString())); - } else { - offset = new AtomicInteger(0); - } - } + if (!isRunning) { + return; } - if (offset != null) { - CloudEvent message = standaloneBroker.getMessage(topicName, offset.get()); - if (message != null) { - EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() { + CloudEvent message = event.getMessage(); + if (message != null) { + EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() { - @Override - public void commit(EventMeshAction action) { - switch (action) { - case CommitMessage: - // update offset - log.info("message commit, topic: {}, current offset:{}", topicName, offset.get()); - break; - case ManualAck: - // update offset - offset.incrementAndGet(); - log.info("message ack, topic: {}, current offset:{}", topicName, offset.get()); - break; - case ReconsumeLater: - default: - - } + @Override + public void commit(EventMeshAction action) { + switch (action) { + case CommitMessage: + // update offset + log.info("message commit, topic: {}, current offset:{}", topicName, event.getOffset()); + break; + case ManualAck: + // update offset + log.info("message ack, topic: {}, current offset:{}", topicName, event.getOffset()); + break; + case ReconsumeLater: + default: } - }; - listener.consume(message, consumeContext); - } + } + }; + listener.consume(message, consumeContext); } } catch (Exception ex) { - log.error("consumer error, topic: {}, offset: {}", topicName, offset == null ? null : offset.get(), ex); + log.error("consumer error, topic: {}, offset: {}", topicName, event.getOffset(), ex); } } - public void shutdown() { - isRunning = false; - } - -} +} \ No newline at end of file diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTask.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTask.java deleted file mode 100644 index 0936c79257..0000000000 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTask.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.storage.standalone.broker.task; - -import org.apache.eventmesh.common.utils.ThreadUtils; - -import java.util.concurrent.TimeUnit; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class SubscribeTask implements Runnable { - - private Subscribe subscribe; - - public SubscribeTask(Subscribe subscribe) { - this.subscribe = subscribe; - } - - @Override - public void run() { - while (subscribe.isRunning()) { - subscribe.subscribe(); - try { - ThreadUtils.sleepWithThrowException(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - log.error("Thread is interrupted, topic: {}, offset: {} thread name: {}", - subscribe.getTopicName(), - subscribe.getOffset() == null ? null : subscribe.getOffset().get(), - Thread.currentThread().getName(), e); - Thread.currentThread().interrupt(); - } - } - } - -} diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java index 9eb753e3fa..edb66703f7 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java @@ -20,17 +20,12 @@ import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.EventListener; import org.apache.eventmesh.api.consumer.Consumer; -import org.apache.eventmesh.common.ThreadPoolFactory; import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker; -import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata; import org.apache.eventmesh.storage.standalone.broker.task.Subscribe; -import org.apache.eventmesh.storage.standalone.broker.task.SubscribeTask; import java.util.List; -import java.util.Objects; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import io.cloudevents.CloudEvent; @@ -45,16 +40,10 @@ public class StandaloneConsumer implements Consumer { private final ConcurrentHashMap subscribeTable; - private final ExecutorService consumeExecutorService; - public StandaloneConsumer(Properties properties) { this.standaloneBroker = StandaloneBroker.getInstance(); this.subscribeTable = new ConcurrentHashMap<>(16); this.isStarted = new AtomicBoolean(false); - this.consumeExecutorService = ThreadPoolFactory.createThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 2, - Runtime.getRuntime().availableProcessors() * 2, - "StandaloneConsumerThread"); } @Override @@ -86,8 +75,6 @@ public void init(Properties keyValue) throws Exception { @Override public void updateOffset(List cloudEvents, AbstractContext context) { - cloudEvents.forEach(cloudEvent -> standaloneBroker.updateOffset( - new TopicMetadata(cloudEvent.getSubject()), Objects.requireNonNull((Long) cloudEvent.getExtension("offset")))); } @@ -99,9 +86,8 @@ public void subscribe(String topic) throws Exception { synchronized (subscribeTable) { standaloneBroker.createTopicIfAbsent(topic); Subscribe subscribe = new Subscribe(topic, standaloneBroker, listener); - SubscribeTask subScribeTask = new SubscribeTask(subscribe); + subscribe.subscribe(); subscribeTable.put(topic, subscribe); - consumeExecutorService.execute(subScribeTask); } } diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java index 5ea0ab6f1a..0c16aabb35 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java @@ -17,9 +17,14 @@ package org.apache.eventmesh.storage.standalone; +import org.apache.eventmesh.storage.standalone.broker.Channel; import org.apache.eventmesh.storage.standalone.broker.MessageQueue; +import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker; import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity; import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata; +import org.apache.eventmesh.storage.standalone.broker.task.Subscribe; + +import org.apache.commons.lang3.tuple.Pair; import java.net.URI; import java.util.Collections; @@ -29,6 +34,7 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; + public class TestUtils { public static final String TEST_TOPIC = "test-topic"; @@ -36,12 +42,18 @@ public class TestUtils { public static final int LENGTH = 5; public static final int EXCEEDED_MESSAGE_STORE_WINDOW = 60 * 60 * 1000 + 1000; - public static ConcurrentHashMap createDefaultMessageContainer() { - ConcurrentHashMap messageContainer = new ConcurrentHashMap<>(1); - messageContainer.put(new TopicMetadata(TEST_TOPIC), new MessageQueue()); - return messageContainer; + public static Pair, ConcurrentHashMap> createDefaultMessageContainer( + StandaloneBroker broker) { + ConcurrentHashMap messageContainer = new ConcurrentHashMap<>(1); + ConcurrentHashMap subscribeContainer = new ConcurrentHashMap<>(1); + + Subscribe subscribe = createSubscribe(broker); + subscribe.subscribe(); + subscribeContainer.put(new TopicMetadata(TEST_TOPIC), subscribe); + return Pair.of(messageContainer, subscribeContainer); } + public static ConcurrentHashMap createMessageContainer(TopicMetadata topicMetadata, MessageEntity messageEntity) throws InterruptedException { ConcurrentHashMap messageContainer = new ConcurrentHashMap<>(1); @@ -79,4 +91,15 @@ public static MessageEntity createMessageEntity(TopicMetadata topicMetadata, Clo offSet, currentTimeMillis); } + + public static Subscribe createSubscribe(StandaloneBroker standaloneBroker) { + return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> { + }); + } + + public static Subscribe createSubscribe(StandaloneBroker standaloneBroker, List cloudEvents) { + return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> { + cloudEvents.add(cloudEvent); + }); + } } diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java index 2d84df265c..7200f902ec 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java @@ -17,18 +17,20 @@ package org.apache.eventmesh.storage.standalone.admin; -import static org.apache.eventmesh.storage.standalone.TestUtils.LENGTH; -import static org.apache.eventmesh.storage.standalone.TestUtils.OFF_SET; import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC; import static org.apache.eventmesh.storage.standalone.TestUtils.createDefaultCloudEvent; import static org.apache.eventmesh.storage.standalone.TestUtils.createDefaultMessageContainer; import static org.apache.eventmesh.storage.standalone.TestUtils.createDefaultMessageEntity; -import org.apache.eventmesh.api.admin.TopicProperties; +import org.apache.eventmesh.storage.standalone.broker.Channel; import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker; import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity; +import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata; +import org.apache.eventmesh.storage.standalone.broker.task.Subscribe; -import java.util.List; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.concurrent.ConcurrentHashMap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -52,6 +54,7 @@ public class StandaloneAdminTest { private StandaloneAdmin standaloneAdmin; + @BeforeEach public void setUp() { initStaticInstance(); @@ -69,13 +72,6 @@ public void testIsClosed() { Assertions.assertTrue(standaloneAdmin.isClosed()); } - @Test - public void testGetTopic() throws Exception { - List topicPropertiesList = standaloneAdmin.getTopic(); - Assertions.assertNotNull(topicPropertiesList); - Assertions.assertFalse(topicPropertiesList.isEmpty()); - } - @Test public void testCreateTopic() { standaloneAdmin.createTopic(TEST_TOPIC); @@ -88,21 +84,6 @@ public void testDeleteTopic() { Mockito.verify(standaloneBroker).deleteTopicIfExist(TEST_TOPIC); } - @Test - public void testGetEvent() throws Exception { - Mockito.when(standaloneBroker.checkTopicExist(TEST_TOPIC)).thenReturn(Boolean.TRUE); - Mockito.when(standaloneBroker.getMessage(TEST_TOPIC, OFF_SET)).thenReturn(createDefaultCloudEvent()); - List events = standaloneAdmin.getEvent(TEST_TOPIC, OFF_SET, LENGTH); - Assertions.assertNotNull(events); - Assertions.assertFalse(events.isEmpty()); - } - - @Test - public void testGetEvent_throwException() { - Mockito.when(standaloneBroker.checkTopicExist(TEST_TOPIC)).thenReturn(Boolean.FALSE); - Exception exception = Assertions.assertThrows(Exception.class, () -> standaloneAdmin.getEvent(TEST_TOPIC, OFF_SET, LENGTH)); - Assertions.assertEquals("The topic name doesn't exist in the message queue", exception.getMessage()); - } @Test public void testPublish() throws Exception { @@ -116,7 +97,11 @@ public void testPublish() throws Exception { private void initStaticInstance() { try (MockedStatic standaloneBrokerMockedStatic = Mockito.mockStatic(StandaloneBroker.class)) { standaloneBrokerMockedStatic.when(StandaloneBroker::getInstance).thenReturn(standaloneBroker); - Mockito.when(standaloneBroker.getMessageContainer()).thenReturn(createDefaultMessageContainer()); + Pair, ConcurrentHashMap> pair = + createDefaultMessageContainer(standaloneBroker); + Mockito.when(standaloneBroker.getSubscribeContainer()).thenReturn(pair.getRight()); + Mockito.when(standaloneBroker.getMessageContainer()).thenReturn(pair.getLeft()); + standaloneAdmin = new StandaloneAdmin(); } } diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java index 3582f95ef5..6d84cb7800 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java @@ -17,15 +17,12 @@ package org.apache.eventmesh.storage.standalone.broker; -import static org.apache.eventmesh.storage.standalone.TestUtils.OFF_SET; import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC; import static org.apache.eventmesh.storage.standalone.TestUtils.createDefaultCloudEvent; +import static org.apache.eventmesh.storage.standalone.TestUtils.createSubscribe; import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity; - -import org.apache.commons.lang3.tuple.Pair; - -import java.util.concurrent.atomic.AtomicLong; +import org.apache.eventmesh.storage.standalone.broker.task.Subscribe; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -34,6 +31,14 @@ public class StandaloneBrokerTest { + + public StandaloneBroker getStandaloneBroker() { + StandaloneBroker instance = StandaloneBroker.getInstance(); + Subscribe subscribe = createSubscribe(instance); + subscribe.subscribe(); + return instance; + } + @Test public void testGetInstance() { Assertions.assertNotNull(StandaloneBroker.getInstance()); @@ -41,49 +46,23 @@ public void testGetInstance() { @Test public void testCreateTopicIfAbsent() { - StandaloneBroker instance = StandaloneBroker.getInstance(); - Pair pair = instance.createTopicIfAbsent(TEST_TOPIC); + StandaloneBroker instance = getStandaloneBroker(); + Channel pair = instance.createTopicIfAbsent(TEST_TOPIC); Assertions.assertNotNull(pair); } @Test public void testPutMessage() throws InterruptedException { - StandaloneBroker instance = StandaloneBroker.getInstance(); + StandaloneBroker instance = getStandaloneBroker(); CloudEvent cloudEvent = createDefaultCloudEvent(); MessageEntity messageEntity = instance.putMessage(TEST_TOPIC, cloudEvent); Assertions.assertNotNull(messageEntity); } - @Test - public void testTakeMessage() throws InterruptedException { - StandaloneBroker instance = StandaloneBroker.getInstance(); - CloudEvent cloudEvent = createDefaultCloudEvent(); - instance.putMessage(TEST_TOPIC, cloudEvent); - CloudEvent message = instance.takeMessage(TEST_TOPIC); - Assertions.assertNotNull(message); - } - - @Test - public void testGetMessage() throws InterruptedException { - StandaloneBroker instance = StandaloneBroker.getInstance(); - CloudEvent cloudEvent = createDefaultCloudEvent(); - instance.putMessage(TEST_TOPIC, cloudEvent); - CloudEvent cloudEventResult = instance.getMessage(TEST_TOPIC); - Assertions.assertNotNull(cloudEventResult); - } - - @Test - public void testMessageWithOffSet() throws InterruptedException { - StandaloneBroker instance = StandaloneBroker.getInstance(); - CloudEvent cloudEvent = createDefaultCloudEvent(); - instance.putMessage(TEST_TOPIC, cloudEvent); - CloudEvent cloudEventResult = instance.getMessage(TEST_TOPIC, OFF_SET); - Assertions.assertNotNull(cloudEventResult); - } @Test public void testCheckTopicExist() throws InterruptedException { - StandaloneBroker instance = StandaloneBroker.getInstance(); + StandaloneBroker instance = getStandaloneBroker(); CloudEvent cloudEvent = createDefaultCloudEvent(); instance.putMessage(TEST_TOPIC, cloudEvent); boolean exists = instance.checkTopicExist(TEST_TOPIC); @@ -92,7 +71,7 @@ public void testCheckTopicExist() throws InterruptedException { @Test public void testDeleteTopicIfExist() throws InterruptedException { - StandaloneBroker instance = StandaloneBroker.getInstance(); + StandaloneBroker instance = getStandaloneBroker(); CloudEvent cloudEvent = createDefaultCloudEvent(); instance.putMessage(TEST_TOPIC, cloudEvent); instance.deleteTopicIfExist(TEST_TOPIC); diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java index bc11c9b0aa..3ef86bdd20 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java @@ -18,14 +18,11 @@ package org.apache.eventmesh.storage.standalone.broker.task; import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC; -import static org.apache.eventmesh.storage.standalone.TestUtils.createDefaultCloudEvent; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import org.apache.eventmesh.api.EventListener; -import org.apache.eventmesh.api.EventMeshAsyncConsumeContext; import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker; import org.junit.jupiter.api.Assertions; @@ -35,8 +32,6 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import io.cloudevents.CloudEvent; - @ExtendWith(MockitoExtension.class) public class SubscribeTest { @@ -48,12 +43,9 @@ public class SubscribeTest { @Test public void testSubscribe() { - CloudEvent cloudEvent = createDefaultCloudEvent(); - Mockito.when(standaloneBroker.getMessage(anyString())).thenReturn(cloudEvent); - Mockito.when(standaloneBroker.getMessage(anyString(), anyLong())).thenReturn(cloudEvent); subscribe = new Subscribe(TEST_TOPIC, standaloneBroker, eventListener); subscribe.subscribe(); - Mockito.verify(eventListener).consume(any(CloudEvent.class), any(EventMeshAsyncConsumeContext.class)); + Mockito.verify(standaloneBroker).subscribed(anyString(), any(Subscribe.class)); } @Test diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java index 37cdc02c6a..4bfee4976f 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java @@ -17,8 +17,11 @@ package org.apache.eventmesh.storage.standalone.producer; +import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC; + import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.storage.standalone.TestUtils; +import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker; import java.util.Properties; @@ -28,10 +31,13 @@ import io.cloudevents.CloudEvent; + + public class StandaloneProducerTest { private StandaloneProducer standaloneProducer; + @BeforeEach public void setUp() { standaloneProducer = new StandaloneProducer(new Properties()); @@ -61,6 +67,8 @@ public void testShutdown() { @Test public void testPublish() { + StandaloneBroker standaloneBroker = StandaloneBroker.getInstance(); + standaloneBroker.createTopicIfAbsent(TEST_TOPIC); CloudEvent cloudEvent = TestUtils.createDefaultCloudEvent(); SendResult sendResult = standaloneProducer.publish(cloudEvent); Assertions.assertNotNull(sendResult);