diff --git a/eventmesh-meta/eventmesh-meta-raft/build.gradle b/eventmesh-meta/eventmesh-meta-raft/build.gradle index 99af858e10..4fb3b7bc4a 100644 --- a/eventmesh-meta/eventmesh-meta-raft/build.gradle +++ b/eventmesh-meta/eventmesh-meta-raft/build.gradle @@ -41,7 +41,7 @@ dependencies { implementation project(":eventmesh-meta:eventmesh-meta-api") implementation project(":eventmesh-common") - + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.0.1' implementation "com.alipay.sofa:jraft-core:1.3.14" implementation "com.alipay.sofa:rpc-grpc-impl:1.3.14" testImplementation 'org.junit.jupiter:junit-jupiter:5.6.0' diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java index 0b0411db80..a568a801dc 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Status; public abstract class EventClosure implements Closure { @@ -34,6 +35,15 @@ public abstract class EventClosure implements Closure { private EventOperation eventOperation; + public static EventClosure createDefaultEventClosure() { + return new EventClosure() { + + @Override + public void run(Status status) { + + } + }; + } public void setFuture(CompletableFuture future) { this.future = future; diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java index 1208900e8b..b8c7a6cd55 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java @@ -32,6 +32,8 @@ public class EventOperation implements Serializable { public static final byte GET = 0x02; + public static final byte DELETE = 0x03; + private byte op; private Map data; @@ -40,6 +42,9 @@ public static EventOperation createOpreation(RequestResponse response) { return new EventOperation(PUT, response.getInfoMap()); } else if (response.getValue() == MetaRaftConstants.GET) { return new EventOperation(GET, response.getInfoMap()); + } else if (response.getValue() == MetaRaftConstants.DELETE) { + return new EventOperation(DELETE, response.getInfoMap()); + } return null; } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java index a2be9c831f..a203ba0e9e 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java @@ -56,7 +56,6 @@ public void applyOperation(EventOperation opreation, EventClosure closure) { this.server.getNode().apply(task); } catch (CodecException e) { String errorMsg = "Fail to encode EventOperation"; - e.printStackTrace(System.err); closure.failure(errorMsg, StringUtils.EMPTY); closure.run(new Status(RaftError.EINTERNAL, errorMsg)); } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java index 0803534177..ba9551be6c 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.meta.raft; +import static org.apache.eventmesh.meta.raft.EventOperation.DELETE; import static org.apache.eventmesh.meta.raft.EventOperation.GET; import static org.apache.eventmesh.meta.raft.EventOperation.PUT; @@ -90,6 +91,12 @@ public void onApply(Iterator iter) { Map tempTable = eventOperation.getData(); contentTable.putAll(tempTable); break; + case DELETE: + Map tempTable2 = eventOperation.getData(); + tempTable2.forEach((key, value) -> { + contentTable.remove(key); + }); + break; default: break; } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java index 4f25162b81..7aa15c1047 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java @@ -28,10 +28,14 @@ import org.apache.eventmesh.meta.raft.rpc.MetaServerHelper; import org.apache.eventmesh.meta.raft.rpc.RequestResponse; +import org.apache.commons.lang3.StringUtils; + import java.io.IOException; +import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,12 +49,16 @@ import com.alipay.sofa.jraft.option.CliOptions; import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; import lombok.extern.slf4j.Slf4j; @Slf4j public class RaftMetaService implements org.apache.eventmesh.api.meta.MetaService { + private static ObjectMapper objectMapper = new ObjectMapper(); + private final AtomicBoolean initStatus = new AtomicBoolean(false); private final AtomicBoolean startStatus = new AtomicBoolean(false); @@ -183,14 +191,55 @@ public void removeMetaData(String key) { @Override public boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws MetaException { + //key= IP@@PORT@@CLUSTER_NAME + String[] ipAndPort = eventMeshRegisterInfo.getEndPoint().split(":"); + String clusterName = eventMeshRegisterInfo.getEventMeshClusterName(); + String key = ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + clusterName; + InfoInner infoInner = new InfoInner(eventMeshRegisterInfo); + String registerInfo = null; + try { + registerInfo = objectMapper.writeValueAsString(infoInner); + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.PUT).putInfo(key, registerInfo).build(); + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + return requestResponse.getSuccess(); + } + } catch (Exception e) { + throw new MetaException("fail to serialize ", e); + } return false; } @Override public boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throws MetaException { + //key= IP@@PORT@@CLUSTER_NAME + String[] ipAndPort = eventMeshUnRegisterInfo.getEndPoint().split(":"); + String clusterName = eventMeshUnRegisterInfo.getEventMeshClusterName(); + String key = ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + clusterName; + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.DELETE).putInfo(key, StringUtils.EMPTY).build(); + try { + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + return requestResponse.getSuccess(); + } + } catch (Exception e) { + throw new MetaException(e.getMessage(), e); + } return false; } + @Data + class InfoInner implements Serializable { + + EventMeshRegisterInfo eventMeshRegisterInfo; + + public InfoInner(EventMeshRegisterInfo eventMeshRegisterInfo) { + this.eventMeshRegisterInfo = eventMeshRegisterInfo; + } + } + public CompletableFuture commit(RequestResponse requestResponse, EventClosure eventClosure) throws RemotingException, InterruptedException { CompletableFuture future = new CompletableFuture<>(); diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java index a7f311234c..6be76855e9 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java @@ -41,5 +41,7 @@ public interface MetaRaftConstants { int GET = 2; - int RESPONSE = 3; + int DELETE = 3; + + int RESPONSE = 4; } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/resources/eventmesh/org.apache.eventmesh.api.meta.MetaService b/eventmesh-meta/eventmesh-meta-raft/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.meta.MetaService similarity index 100% rename from eventmesh-meta/eventmesh-meta-raft/src/main/resources/eventmesh/org.apache.eventmesh.api.meta.MetaService rename to eventmesh-meta/eventmesh-meta-raft/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.meta.MetaService diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 91a422fa4c..3fa2936525 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -79,12 +79,20 @@ eventMesh.security.validation.type.token=false eventMesh.security.publickey= # metaStorage plugin -eventMesh.metaStorage.plugin.enabled=false -eventMesh.metaStorage.plugin.type=nacos +eventMesh.metaStorage.plugin.enabled=true +eventMesh.metaStorage.plugin.type=raft eventMesh.metaStorage.plugin.server-addr=127.0.0.1:8848 eventMesh.metaStorage.plugin.username=nacos eventMesh.metaStorage.plugin.password=nacos +# metaStorage plugin: raft +eventMesh.metaStorage.raft.dataPath=/tmp/server1 +eventMesh.metaStorage.raft.self=127.0.0.1:9091 +eventMesh.metaStorage.raft.members=127.0.0.1:9092,127.0.0.1:9093 +eventMesh.metaStorage.raft.electionTimeout=3000 +eventMesh.metaStorage.raft.snapshotInterval=30 +eventMesh.metaStorage.raft.refreshLeaderInterval=30 + # metaStorage plugin: nacos #eventMesh.metaStorage.nacos.endpoint= #eventMesh.metaStorage.nacos.accessKey=