diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/pom.xml b/dubbo-serialization-extensions/dubbo-serialization-protobuf/pom.xml index f9a65d48..e4815737 100644 --- a/dubbo-serialization-extensions/dubbo-serialization-protobuf/pom.xml +++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/pom.xml @@ -48,6 +48,10 @@ limitations under the License. protobuf-java-util 3.11.0 + + org.apache.dubbo + dubbo + diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufObjectInput.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufObjectInput.java new file mode 100644 index 00000000..80092dab --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufObjectInput.java @@ -0,0 +1,134 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dubbo.common.serialize.protobuf.support; + +import org.apache.dubbo.common.serialize.ObjectInput; +import org.apache.dubbo.common.serialize.hessian2.Hessian2ObjectInput; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Type; + +public class ProtobufObjectInput implements ObjectInput { + private final Hessian2ObjectInput delegate; + + ProtobufObjectInput(InputStream is) { + this.delegate = new Hessian2ObjectInput(is); + } + + @Override + public Object readObject() { + String className = null; + try { + className = readUTF(); + if (ProtobufUtils.canSerializeWithProtobuf(ProtobufUtils.loadClass(className))) { + byte[] data = readBytes(); + return deserializeWithProtobuf(data, ProtobufUtils.loadClass(className)); + } else { + return delegate.readObject(); + } + } catch (Throwable ex) { + String message = + className == null ? "Unable to deserializeWithProtobuf object" : "Unable to deserializeWithProtobuf object for " + className; + throw new ProtobufSerializationException(message, ex); + } + } + + private T deserializeWithProtobuf(byte[] data, Class clazz) throws IOException, NoSuchMethodException, + IllegalAccessException, InvocationTargetException { + try (ByteArrayInputStream newIs = new ByteArrayInputStream(data)) { + if (ProtobufUtils.canSerializeWithProtobuf(clazz)) { + return ProtobufUtils.deserializeWithProtobuf(newIs, clazz); + } else { + return (T) delegate.readObject(); + } + } + } + + @Override + public T readObject(Class cls) { + if (cls == null || cls == Object.class) { + return (T) readObject(); + } + + try { + String className = readUTF(); + if (ProtobufUtils.canSerializeWithProtobuf(ProtobufUtils.loadClass(className))) { + byte[] data = readBytes(); + return deserializeWithProtobuf(data, cls); + } + return delegate.readObject(cls); + } catch (Throwable ex) { + throw new ProtobufSerializationException("Unable to deserializeWithProtobuf object for " + cls.getName(), ex); + } + } + + @Override + public T readObject(Class cls, Type type) { + return readObject(cls); + } + + @Override + public boolean readBool() throws IOException { + return delegate.readBool(); + } + + @Override + public byte readByte() throws IOException { + return delegate.readByte(); + } + + @Override + public short readShort() throws IOException { + return delegate.readShort(); + } + + @Override + public int readInt() throws IOException { + return delegate.readInt(); + } + + @Override + public long readLong() throws IOException { + return delegate.readLong(); + } + + @Override + public float readFloat() throws IOException { + return delegate.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return delegate.readDouble(); + } + + @Override + public byte[] readBytes() throws IOException { + return delegate.readBytes(); + } + + @Override + public String readUTF() throws IOException { + return delegate.readUTF(); + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufObjectOutput.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufObjectOutput.java new file mode 100644 index 00000000..24ad2c34 --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufObjectOutput.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dubbo.common.serialize.protobuf.support; + +import org.apache.dubbo.common.serialize.ObjectOutput; +import org.apache.dubbo.common.serialize.hessian2.Hessian2ObjectOutput; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +public class ProtobufObjectOutput implements ObjectOutput { + private final Hessian2ObjectOutput delegate; + + public ProtobufObjectOutput(OutputStream outputStream) { + this.delegate = new Hessian2ObjectOutput(outputStream); + } + + /** + * when object insatanceOf protobuf object serialize with ProtobufBuilder + * + * @param obj object + */ + @Override + public void writeObject(Object obj) { + try { + Class clazz = obj.getClass(); + writeUTF(clazz.getName()); + if (ProtobufUtils.canSerializeWithProtobuf(clazz)) { + writeBytes(serializeToBytes(obj)); + } else { + delegate.writeObject(obj); + } + } catch (Throwable ex) { + String message = + obj == null ? "Unable to serialize object" : "Unable to serialize object for " + obj.getClass().getName(); + throw new ProtobufSerializationException(message, ex); + } + } + + private byte[] serializeToBytes(Object obj) throws IOException { + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + ProtobufUtils.serializeWithProtobuf(obj, os); + return os.toByteArray(); + } + } + + @Override + public void writeBool(boolean v) throws IOException { + delegate.writeBool(v); + } + + @Override + public void writeByte(byte v) throws IOException { + delegate.writeByte(v); + } + + @Override + public void writeShort(short v) throws IOException { + delegate.writeShort(v); + } + + @Override + public void writeInt(int v) throws IOException { + delegate.writeInt(v); + } + + @Override + public void writeLong(long v) throws IOException { + delegate.writeLong(v); + } + + @Override + public void writeFloat(float v) throws IOException { + delegate.writeFloat(v); + } + + @Override + public void writeDouble(double v) throws IOException { + delegate.writeDouble(v); + } + + @Override + public void writeBytes(byte[] b) throws IOException { + delegate.writeBytes(b); + } + + @Override + public void writeBytes(byte[] b, int off, int len) throws IOException { + delegate.writeBytes(b, off, len); + } + + @Override + public void writeUTF(String v) throws IOException { + delegate.writeUTF(v); + } + + @Override + public void flushBuffer() throws IOException { + delegate.flushBuffer(); + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufSerialization.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufSerialization.java new file mode 100644 index 00000000..be0e0d67 --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufSerialization.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dubbo.common.serialize.protobuf.support; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.serialize.ObjectInput; +import org.apache.dubbo.common.serialize.ObjectOutput; +import org.apache.dubbo.common.serialize.Serialization; + +import java.io.InputStream; +import java.io.OutputStream; + +/** + * @author lin + * Date: 2019-10-01 + */ +public class ProtobufSerialization implements Serialization { + //Serialization ID must less than SERIALIZATION_MASK and cannot be duplicated with other serializations + private static final int CONTENT_TYPE_ID = 30; + + /** + * The content type id should not be changed in any circumstance! + */ + @Override + public byte getContentTypeId() { + return CONTENT_TYPE_ID; + } + + @Override + public String getContentType() { + return "x-application/protobuf"; + } + + @Override + public ObjectOutput serialize(URL url, OutputStream output) { + return new ProtobufObjectOutput(output); + } + + @Override + public ObjectInput deserialize(URL url, InputStream input) { + return new ProtobufObjectInput(input); + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufSerializationException.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufSerializationException.java new file mode 100644 index 00000000..c16fec68 --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufSerializationException.java @@ -0,0 +1,7 @@ +package org.apache.dubbo.common.serialize.protobuf.support; + +public class ProtobufSerializationException extends RuntimeException { + public ProtobufSerializationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufUtils.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufUtils.java index 42285ca1..e6a04dcd 100644 --- a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufUtils.java +++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufUtils.java @@ -16,6 +16,7 @@ */ package org.apache.dubbo.common.serialize.protobuf.support; +import com.google.protobuf.GeneratedMessageV3; import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue; import org.apache.dubbo.common.serialize.protobuf.support.wrapper.ThrowablePB.StackTraceElementProto; import org.apache.dubbo.common.serialize.protobuf.support.wrapper.ThrowablePB.ThrowableProto; @@ -42,12 +43,27 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class ProtobufUtils { + private static final Map className2ClassMap = new ConcurrentHashMap(); + + private static final Object className2ClassMapLock = new Object(); + + private static final Object class2ParserMapLock = new Object(); + + private static ClassLoader classLoader = Thread.currentThread().getContextClassLoader();; + + private static final Map> class2ParserMap = new ConcurrentHashMap<>(); + static boolean isSupported(Class clazz) { if (clazz == null) { return false; @@ -184,6 +200,90 @@ private static StackTraceElementProto toStackTraceElement(StackTraceElement elem return builder.build(); } + public static boolean canSerializeWithProtobuf(Class clazz) { + if (clazz == null) { + return false; + } + + if (GeneratedMessageV3.class.isAssignableFrom(MessageLite.class)) { + return true; + } + + if (Map.class.isAssignableFrom(clazz) || Array.class.isAssignableFrom(clazz) || List.class.isAssignableFrom(clazz) + || String.class.isAssignableFrom(clazz) || Number.class.isAssignableFrom(clazz) + || Boolean.class.isAssignableFrom(clazz) || Throwable.class.isAssignableFrom(clazz) || clazz.isArray() + || clazz.isEnum()) { + return false; + } + return true; + } + + + public static void serializeWithProtobuf(Object o, OutputStream os) { + if (!(o instanceof MessageLite)) { + return; + } + try { + ((MessageLite) o).writeTo(os); + } catch (IOException e) { + throw new RuntimeException("Google PB序列化失败,序列化对象的类型为" + o.getClass().getName(), e); + } + } + + public static Class loadClass(String className) throws ClassNotFoundException { + Class clazz = className2ClassMap.get(className); + if (clazz == null) { + synchronized (className2ClassMapLock) { + clazz = className2ClassMap.get(className); + if (clazz == null) { + clazz = Class.forName(className, false, classLoader); + className2ClassMap.put(className, clazz); + } + } + } + return clazz; + } + + public static T deserializeWithProtobuf(InputStream is, Class clazz) throws InvalidProtocolBufferException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + if (!(MessageLite.class.isAssignableFrom(clazz))) { + return null; + } + Parser parser = getParser(clazz); + return (T) parser.parseFrom(is); + } + + /** + * According to Java + * Generated Code, every pb entity class has a parser method. + */ + @SuppressWarnings("unchecked") + private static Parser getParser(Class clazz) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Parser parser = class2ParserMap.get(clazz); + if (parser == null) { + synchronized (class2ParserMapLock) { + parser = class2ParserMap.get(clazz); + if (parser == null) { + try { + Method parserMethod = clazz.getMethod("parser"); + parser = (Parser) parserMethod.invoke(clazz); + } catch (NoSuchMethodException ex) { + // try to find from PARSER field + try { + Field parserField = clazz.getField("PARSER"); + parser = (Parser) parserField.get(clazz); + } catch (NoSuchFieldException ex2) { + // throw NoSuchMethodException instead of NoSuchFieldException + throw ex; + } + } + class2ParserMap.put(clazz, parser); + } + } + } + return parser; + } + private static final class MessageMarshaller { private final Parser parser; private final T defaultInstance; diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization index b18b8be4..befcd249 100644 --- a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization +++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization @@ -1,2 +1,3 @@ protobuf-json=org.apache.dubbo.common.serialize.protobuf.support.GenericProtobufJsonSerialization -protobuf=org.apache.dubbo.common.serialize.protobuf.support.GenericProtobufSerialization \ No newline at end of file +protobuf=org.apache.dubbo.common.serialize.protobuf.support.GenericProtobufSerialization +generic-protobuf-json=org.apache.dubbo.common.serialize.protobuf.support.ProtobufSerialization