Skip to content

Commit

Permalink
init function
Browse files Browse the repository at this point in the history
  • Loading branch information
karsonto committed Apr 24, 2024
1 parent 329d90f commit 2e6a821
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 5 deletions.
2 changes: 1 addition & 1 deletion eventmesh-meta/eventmesh-meta-raft/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dependencies {

implementation project(":eventmesh-meta:eventmesh-meta-api")
implementation project(":eventmesh-common")

implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.0.1'
implementation "com.alipay.sofa:jraft-core:1.3.14"
implementation "com.alipay.sofa:rpc-grpc-impl:1.3.14"
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CompletableFuture;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;


public abstract class EventClosure implements Closure {
Expand All @@ -34,6 +35,15 @@ public abstract class EventClosure implements Closure {

private EventOperation eventOperation;

public static EventClosure createDefaultEventClosure() {
return new EventClosure() {

@Override
public void run(Status status) {

}
};
}

public void setFuture(CompletableFuture<RequestResponse> future) {
this.future = future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class EventOperation implements Serializable {

public static final byte GET = 0x02;

public static final byte DELETE = 0x03;

private byte op;
private Map<String, String> data;

Expand All @@ -40,6 +42,9 @@ public static EventOperation createOpreation(RequestResponse response) {
return new EventOperation(PUT, response.getInfoMap());
} else if (response.getValue() == MetaRaftConstants.GET) {
return new EventOperation(GET, response.getInfoMap());
} else if (response.getValue() == MetaRaftConstants.DELETE) {
return new EventOperation(DELETE, response.getInfoMap());

}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public void applyOperation(EventOperation opreation, EventClosure closure) {
this.server.getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode EventOperation";
e.printStackTrace(System.err);
closure.failure(errorMsg, StringUtils.EMPTY);
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.meta.raft;

import static org.apache.eventmesh.meta.raft.EventOperation.DELETE;
import static org.apache.eventmesh.meta.raft.EventOperation.GET;
import static org.apache.eventmesh.meta.raft.EventOperation.PUT;

Expand Down Expand Up @@ -90,6 +91,12 @@ public void onApply(Iterator iter) {
Map<String, String> tempTable = eventOperation.getData();
contentTable.putAll(tempTable);
break;
case DELETE:
Map<String, String> tempTable2 = eventOperation.getData();
tempTable2.forEach((key, value) -> {
contentTable.remove(key);
});
break;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
import org.apache.eventmesh.meta.raft.rpc.MetaServerHelper;
import org.apache.eventmesh.meta.raft.rpc.RequestResponse;

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -45,12 +49,16 @@
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RaftMetaService implements org.apache.eventmesh.api.meta.MetaService {

private static ObjectMapper objectMapper = new ObjectMapper();

private final AtomicBoolean initStatus = new AtomicBoolean(false);

private final AtomicBoolean startStatus = new AtomicBoolean(false);
Expand Down Expand Up @@ -183,14 +191,55 @@ public void removeMetaData(String key) {

@Override
public boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws MetaException {
//key= IP@@PORT@@CLUSTER_NAME
String[] ipAndPort = eventMeshRegisterInfo.getEndPoint().split(":");
String clusterName = eventMeshRegisterInfo.getEventMeshClusterName();
String key = ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + clusterName;
InfoInner infoInner = new InfoInner(eventMeshRegisterInfo);
String registerInfo = null;
try {
registerInfo = objectMapper.writeValueAsString(infoInner);
RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.PUT).putInfo(key, registerInfo).build();
CompletableFuture<RequestResponse> future = commit(req, EventClosure.createDefaultEventClosure());
RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS);
if (requestResponse != null) {
return requestResponse.getSuccess();
}
} catch (Exception e) {
throw new MetaException("fail to serialize ", e);
}
return false;
}

@Override
public boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throws MetaException {
//key= IP@@PORT@@CLUSTER_NAME
String[] ipAndPort = eventMeshUnRegisterInfo.getEndPoint().split(":");
String clusterName = eventMeshUnRegisterInfo.getEventMeshClusterName();
String key = ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + clusterName;
RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.DELETE).putInfo(key, StringUtils.EMPTY).build();
try {
CompletableFuture<RequestResponse> future = commit(req, EventClosure.createDefaultEventClosure());
RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS);
if (requestResponse != null) {
return requestResponse.getSuccess();
}
} catch (Exception e) {
throw new MetaException(e.getMessage(), e);
}
return false;
}

@Data
class InfoInner implements Serializable {

EventMeshRegisterInfo eventMeshRegisterInfo;

public InfoInner(EventMeshRegisterInfo eventMeshRegisterInfo) {
this.eventMeshRegisterInfo = eventMeshRegisterInfo;
}
}

public CompletableFuture<RequestResponse> commit(RequestResponse requestResponse, EventClosure eventClosure)
throws RemotingException, InterruptedException {
CompletableFuture<RequestResponse> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ public interface MetaRaftConstants {

int GET = 2;

int RESPONSE = 3;
int DELETE = 3;

int RESPONSE = 4;
}
12 changes: 10 additions & 2 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,20 @@ eventMesh.security.validation.type.token=false
eventMesh.security.publickey=

# metaStorage plugin
eventMesh.metaStorage.plugin.enabled=false
eventMesh.metaStorage.plugin.type=nacos
eventMesh.metaStorage.plugin.enabled=true
eventMesh.metaStorage.plugin.type=raft
eventMesh.metaStorage.plugin.server-addr=127.0.0.1:8848
eventMesh.metaStorage.plugin.username=nacos
eventMesh.metaStorage.plugin.password=nacos

# metaStorage plugin: raft
eventMesh.metaStorage.raft.dataPath=/tmp/server1
eventMesh.metaStorage.raft.self=127.0.0.1:9091
eventMesh.metaStorage.raft.members=127.0.0.1:9092,127.0.0.1:9093
eventMesh.metaStorage.raft.electionTimeout=3000
eventMesh.metaStorage.raft.snapshotInterval=30
eventMesh.metaStorage.raft.refreshLeaderInterval=30

# metaStorage plugin: nacos
#eventMesh.metaStorage.nacos.endpoint=
#eventMesh.metaStorage.nacos.accessKey=
Expand Down

0 comments on commit 2e6a821

Please sign in to comment.