Skip to content

Commit

Permalink
finish this function.
Browse files Browse the repository at this point in the history
  • Loading branch information
karsonto committed Apr 24, 2024
1 parent 2e6a821 commit 77d2dbd
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,43 @@
import static org.apache.eventmesh.meta.raft.EventOperation.GET;
import static org.apache.eventmesh.meta.raft.EventOperation.PUT;

import org.apache.eventmesh.meta.raft.snapshot.MetaSnapshotFile;

import org.apache.commons.lang.StringUtils;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;

import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MetaStateMachine extends StateMachineAdapter {

private final AtomicLong leaderTerm = new AtomicLong(-1);

private static ObjectMapper objectMapper = new ObjectMapper();

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private Map<String, String> contentTable = new ConcurrentHashMap<>();

public boolean isLeader() {
Expand All @@ -49,11 +67,44 @@ public boolean isLeader() {
@Override
public boolean onSnapshotLoad(final SnapshotReader reader) {
if (isLeader()) {
log.warn("Leader is not supposed to load snapshot");
return false;
}
if (reader.getFileMeta("data") == null) {
log.error("Fail to find data file in {}", reader.getPath());
return false;
}
final MetaSnapshotFile snapshot = new MetaSnapshotFile(reader.getPath() + File.separator + "data");
try {
Map<String, String> snapshotLoaded = objectMapper.readValue(snapshot.load(), Map.class);
contentTable.clear();
contentTable.putAll(snapshotLoaded);
return true;
} catch (final IOException e) {
log.error("Fail to load snapshot from {}", snapshot.getPath());
return false;
}

return true;
}

@Override
public void onSnapshotSave(SnapshotWriter writer, Closure done) {
executor.submit(() -> {
final MetaSnapshotFile snapshot = new MetaSnapshotFile(writer.getPath() + File.separator + "data");
try {
if (snapshot.save(objectMapper.writeValueAsString(contentTable))) {
if (writer.addFile("data")) {
done.run(Status.OK());
} else {
done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
}
} else {
done.run(new Status(RaftError.EIO, "Fail to save snapshot %s", snapshot.getPath()));
}
} catch (IOException e) {
done.run(new Status(RaftError.EIO, "Fail to deserialize snapshot %s", snapshot.getPath()));
}
});
}

@Override
Expand Down Expand Up @@ -90,12 +141,20 @@ public void onApply(Iterator iter) {
case PUT:
Map<String, String> tempTable = eventOperation.getData();
contentTable.putAll(tempTable);
log.info("update MetaStateMachine successfully {}", contentTable);
break;
case DELETE:
Map<String, String> tempTable2 = eventOperation.getData();
tempTable2.forEach((key, value) -> {
contentTable.remove(key);
String remove = contentTable.remove(key);
if (Objects.isNull(remove)) {
log.warn("delete MetaStateMachine key: {} fail.", remove);
} else {
log.info("delete MetaStateMachine key: {} successfully.", remove);
}

});

break;
default:
break;
Expand Down
Loading

0 comments on commit 77d2dbd

Please sign in to comment.