diff --git a/eventmesh-meta/eventmesh-meta-raft/build.gradle b/eventmesh-meta/eventmesh-meta-raft/build.gradle index 210e348c86..6abc73dd96 100644 --- a/eventmesh-meta/eventmesh-meta-raft/build.gradle +++ b/eventmesh-meta/eventmesh-meta-raft/build.gradle @@ -40,6 +40,7 @@ dependencies { implementation project(":eventmesh-common") implementation "com.alipay.sofa:jraft-core:${jraftVersion}" implementation "com.alipay.sofa:rpc-grpc-impl:${jraftVersion}" + implementation group: 'com.caucho', name: 'hessian', version: '4.0.63' testImplementation 'org.junit.jupiter:junit-jupiter' } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java index 1af6d5c963..1f655eb93e 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java @@ -18,13 +18,13 @@ package org.apache.eventmesh.meta.raft; import org.apache.eventmesh.meta.raft.rpc.RequestResponse; +import org.apache.eventmesh.meta.raft.serialize.EventMeshHessianSerializer; import org.apache.commons.lang.StringUtils; import java.nio.ByteBuffer; import com.alipay.remoting.exception.CodecException; -import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; @@ -51,7 +51,7 @@ public void applyOperation(EventOperation opreation, EventClosure closure) { try { closure.setEventOperation(opreation); final Task task = new Task(); - task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(opreation))); + task.setData(ByteBuffer.wrap(EventMeshHessianSerializer.getInstance().serialize(opreation))); task.setDone(closure); this.server.getNode().apply(task); } catch (CodecException e) { diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java index a0607f5ab4..0d4690fb1c 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java @@ -21,6 +21,7 @@ import static org.apache.eventmesh.meta.raft.EventOperation.GET; import static org.apache.eventmesh.meta.raft.EventOperation.PUT; +import org.apache.eventmesh.meta.raft.serialize.EventMeshHessianSerializer; import org.apache.eventmesh.meta.raft.snapshot.MetaSnapshotFile; import org.apache.commons.lang.StringUtils; @@ -37,7 +38,6 @@ import java.util.concurrent.atomic.AtomicLong; import com.alipay.remoting.exception.CodecException; -import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Iterator; import com.alipay.sofa.jraft.Status; @@ -121,7 +121,7 @@ public void onApply(Iterator iter) { // Have to parse FetchAddRequest from this user log. final ByteBuffer data = iter.getData(); try { - eventOperation = SerializerManager.getSerializer(SerializerManager.Hessian2) + eventOperation = EventMeshHessianSerializer.getInstance() .deserialize(data.array(), EventOperation.class.getName()); } catch (final CodecException e) { e.printStackTrace(System.err); diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/serialize/EventMeshHessianSerializer.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/serialize/EventMeshHessianSerializer.java new file mode 100644 index 0000000000..646d69c0d3 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/serialize/EventMeshHessianSerializer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.serialize; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.HessianSerializer; +import com.caucho.hessian.io.Hessian2Input; +import com.caucho.hessian.io.Hessian2Output; +import com.caucho.hessian.io.SerializerFactory; + +public class EventMeshHessianSerializer extends HessianSerializer { + + private SerializerFactory customizeSerializerFactory = new EventMeshSerializerFactory(); + + private static EventMeshHessianSerializer instance; + + private EventMeshHessianSerializer() { + } + + public static HessianSerializer getInstance() { + if (instance == null) { + synchronized (EventMeshHessianSerializer.class) { + if (instance == null) { + instance = new EventMeshHessianSerializer(); + } + } + } + return instance; + } + + @Override + public byte[] serialize(Object obj) throws CodecException { + ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); + Hessian2Output output = new Hessian2Output(byteArray); + output.setSerializerFactory(customizeSerializerFactory); + try { + output.writeObject(obj); + output.close(); + } catch (IOException e) { + throw new CodecException("IOException occurred when Hessian serializer encode!", e); + } + + return byteArray.toByteArray(); + } + + @Override + public T deserialize(byte[] data, String classOfT) throws CodecException { + Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(data)); + input.setSerializerFactory(customizeSerializerFactory); + Object resultObject; + try { + resultObject = input.readObject(); + input.close(); + } catch (IOException e) { + throw new CodecException("IOException occurred when Hessian serializer decode!", e); + } + return (T) resultObject; + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/serialize/EventMeshSerializerFactory.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/serialize/EventMeshSerializerFactory.java new file mode 100644 index 0000000000..d16796219d --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/serialize/EventMeshSerializerFactory.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.serialize; + +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import com.caucho.hessian.io.SerializerFactory; + +public class EventMeshSerializerFactory extends SerializerFactory { + EventMeshSerializerFactory() { + super(); + super.getClassFactory().setWhitelist(true); + allowBasicType(); + allowCollections(); + allowConcurrent(); + allowTime(); + super.getClassFactory().allow("org.apache.eventmesh.*"); + } + + private void allowBasicType() { + super.getClassFactory().allow(boolean.class.getCanonicalName()); + super.getClassFactory().allow(byte.class.getCanonicalName()); + super.getClassFactory().allow(char.class.getCanonicalName()); + super.getClassFactory().allow(double.class.getCanonicalName()); + super.getClassFactory().allow(float.class.getCanonicalName()); + super.getClassFactory().allow(int.class.getCanonicalName()); + super.getClassFactory().allow(long.class.getCanonicalName()); + super.getClassFactory().allow(short.class.getCanonicalName()); + super.getClassFactory().allow(Boolean.class.getCanonicalName()); + super.getClassFactory().allow(Byte.class.getCanonicalName()); + super.getClassFactory().allow(Character.class.getCanonicalName()); + super.getClassFactory().allow(Double.class.getCanonicalName()); + super.getClassFactory().allow(Float.class.getCanonicalName()); + super.getClassFactory().allow(Integer.class.getCanonicalName()); + super.getClassFactory().allow(Long.class.getCanonicalName()); + super.getClassFactory().allow(Short.class.getCanonicalName()); + + super.getClassFactory().allow(Number.class.getCanonicalName()); + super.getClassFactory().allow(Class.class.getCanonicalName()); + super.getClassFactory().allow(String.class.getCanonicalName()); + } + + private void allowCollections() { + super.getClassFactory().allow(List.class.getCanonicalName()); + super.getClassFactory().allow(ArrayList.class.getCanonicalName()); + super.getClassFactory().allow(LinkedList.class.getCanonicalName()); + + super.getClassFactory().allow(Set.class.getCanonicalName()); + super.getClassFactory().allow(HashSet.class.getCanonicalName()); + super.getClassFactory().allow(LinkedHashSet.class.getCanonicalName()); + super.getClassFactory().allow(TreeSet.class.getCanonicalName()); + + super.getClassFactory().allow(Map.class.getCanonicalName()); + super.getClassFactory().allow(HashMap.class.getCanonicalName()); + super.getClassFactory().allow(LinkedHashMap.class.getCanonicalName()); + super.getClassFactory().allow(TreeMap.class.getCanonicalName()); + super.getClassFactory().allow(WeakHashMap.class.getCanonicalName()); + + super.getClassFactory().allow("java.util.Arrays$ArrayList"); + super.getClassFactory().allow("java.util.Collections$EmptyList"); + super.getClassFactory().allow("java.util.Collections$EmptyMap"); + super.getClassFactory().allow("java.util.Collections$SingletonSet"); + super.getClassFactory().allow("java.util.Collections$SingletonList"); + super.getClassFactory().allow("java.util.Collections$UnmodifiableCollection"); + super.getClassFactory().allow("java.util.Collections$UnmodifiableList"); + super.getClassFactory().allow("java.util.Collections$UnmodifiableMap"); + super.getClassFactory().allow("java.util.Collections$UnmodifiableNavigableMap"); + super.getClassFactory().allow("java.util.Collections$UnmodifiableNavigableSet"); + super.getClassFactory().allow("java.util.Collections$UnmodifiableRandomAccessList"); + super.getClassFactory().allow("java.util.Collections$UnmodifiableSet"); + super.getClassFactory().allow("java.util.Collections$UnmodifiableSortedMap"); + super.getClassFactory().allow("java.util.Collections$UnmodifiableSortedSet"); + } + + private void allowConcurrent() { + super.getClassFactory().allow(AtomicBoolean.class.getCanonicalName()); + super.getClassFactory().allow(AtomicInteger.class.getCanonicalName()); + super.getClassFactory().allow(AtomicLong.class.getCanonicalName()); + super.getClassFactory().allow(AtomicReference.class.getCanonicalName()); + + super.getClassFactory().allow(ConcurrentMap.class.getCanonicalName()); + super.getClassFactory().allow(ConcurrentHashMap.class.getCanonicalName()); + super.getClassFactory().allow(ConcurrentSkipListMap.class.getCanonicalName()); + super.getClassFactory().allow(CopyOnWriteArrayList.class.getCanonicalName()); + } + + private void allowTime() { + super.getClassFactory().allow(SimpleDateFormat.class.getCanonicalName()); + super.getClassFactory().allow(DateTimeFormatter.class.getCanonicalName()); + super.getClassFactory().allow(Instant.class.getCanonicalName()); + super.getClassFactory().allow(LocalDate.class.getCanonicalName()); + super.getClassFactory().allow(LocalDateTime.class.getCanonicalName()); + super.getClassFactory().allow(LocalTime.class.getCanonicalName()); + super.getClassFactory().allow(TimeUnit.class.getCanonicalName()); + super.getClassFactory().allow(Date.class.getCanonicalName()); + super.getClassFactory().allow(Calendar.class.getCanonicalName()); + } + + +}