diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java index 47bb699c9fd64c793c72b0588b4fbf7ef3fa93b9..0f155e07c7e6bc580652215b4fb1fbb51ab8aae7 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java @@ -40,7 +40,7 @@ public class ConsumerInterceptor { private NettyClient nettyClient = NettyClient.getInstance(); - public ConsumerInterceptor(Host host) { + ConsumerInterceptor(Host host) { this.host = host; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java index d44ed0408df0fde80c568eb20c0d6d1a9f533c27..3856ecbc4d851a6359a009edb434f83528f726f5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java @@ -17,7 +17,9 @@ package org.apache.dolphinscheduler.rpc.codec; -import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; +import org.apache.dolphinscheduler.rpc.serializer.ProtoStuffUtils; +import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer; +import org.apache.dolphinscheduler.rpc.serializer.Serializer; import java.util.List; @@ -49,9 +51,12 @@ public class NettyDecoder extends ByteToMessageDecoder { if (byteBuf.readableBytes() < dataLength) { byteBuf.resetReaderIndex(); } + + byte serializerType=1; byte[] data = new byte[dataLength]; byteBuf.readBytes(data); - Object obj = ProtoStuffUtils.deserialize(data, genericClass); + Serializer serializer=RpcSerializer.getSerializerByType(serializerType); + Object obj = serializer.deserialize(data, genericClass); list.add(obj); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java index 280fefc78cfec3e2d815c92ec2ee49c96f8498c8..9333a618bf4c1fb350a2c6bcc4158629e4cbb58b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.rpc.codec; -import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; +import org.apache.dolphinscheduler.rpc.serializer.ProtoStuffUtils; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..f6d091855d926d6d383f33dbd6ae8d4c9260b8c2 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.rpc.protocol; + +public class MessageHeader { + + private byte magic=(byte) 0xbabe; + + /** + * context length + */ + private int contextLength; + + /** + * context + */ + private byte[] context; + + private String requestId; + + + private byte type; + + private byte status; + + private byte serialization; + + + public int getContextLength() { + return contextLength; + } + + public void setContextLength(int contextLength) { + this.contextLength = contextLength; + } + + public byte[] getContext() { + return context; + } + + public void setContext(byte[] context) { + this.context = context; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public byte getType() { + return type; + } + + public void setType(byte type) { + this.type = type; + } + + public byte getStatus() { + return status; + } + + public void setStatus(byte status) { + this.status = status; + } + + public byte getSerialization() { + return serialization; + } + + public void setSerialization(byte serialization) { + this.serialization = serialization; + } + + public byte getMagic() { + return magic; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocol.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocol.java new file mode 100644 index 0000000000000000000000000000000000000000..a217ca3aec7033077dd0f27ec46b02307128e3df --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocol.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.rpc.protocol; + +public class RpcProtocol{ + + private MessageHeader msgHeader; + + private T body; + + public MessageHeader getMsgHeader() { + return msgHeader; + } + + public void setMsgHeader(MessageHeader msgHeader) { + this.msgHeader = msgHeader; + } + + public T getBody() { + return body; + } + + public void setBody(T body) { + this.body = body; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffSerializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffSerializer.java new file mode 100644 index 0000000000000000000000000000000000000000..3cb3e0a77683fcbf8b0406c4830115ee7c0fcf83 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffSerializer.java @@ -0,0 +1,62 @@ +package org.apache.dolphinscheduler.rpc.serializer;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.protostuff.LinkedBuffer; +import io.protostuff.ProtostuffIOUtil; +import io.protostuff.Schema; +import io.protostuff.runtime.RuntimeSchema; + +public class ProtoStuffSerializer implements Serializer{ + + private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); + + private static Map, Schema> schemaCache = new ConcurrentHashMap<>(); + + + @SuppressWarnings("unchecked") + private static Schema getSchema(Class clazz) { + return (Schema) schemaCache.computeIfAbsent(clazz, RuntimeSchema::createFrom); + } + + @Override + public byte[] serialize(T obj) throws IOException { + Class clazz = (Class) obj.getClass(); + Schema schema = getSchema(clazz); + byte[] data; + try { + data = ProtostuffIOUtil.toByteArray(obj, schema, buffer); + } finally { + buffer.clear(); + } + return data; + } + + @Override + public T deserialize(byte[] data, Class clz) throws IOException { + Schema schema = getSchema(clz); + T obj = schema.newMessage(); + if (null == obj) { + return null; + } + ProtostuffIOUtil.mergeFrom(data, obj, schema); + return obj; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffUtils.java similarity index 97% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffUtils.java index 9014b82276ef517c3bb324efe642000dc4215885..ef2a8846f1669db823186ee28c95caaa1ad88920 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.serialize; +package org.apache.dolphinscheduler.rpc.serializer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java new file mode 100644 index 0000000000000000000000000000000000000000..1ea4ab80be72fb2fd08f9db7cb15cd9432efc8e6 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java @@ -0,0 +1,45 @@ +package org.apache.dolphinscheduler.rpc.serializer;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.HashMap; + +public enum RpcSerializer { + + + PROTOSTUFF((byte) 1, new ProtoStuffSerializer()); + + byte type; + + Serializer serializer; + + RpcSerializer(byte type, Serializer serializer) { + this.type = type; + this.serializer = serializer; + } + + private static HashMap SERIALIZERS_MAP = new HashMap<>(); + + static { + for (RpcSerializer rpcSerializer : RpcSerializer.values()) { + SERIALIZERS_MAP.put(rpcSerializer.type, rpcSerializer.serializer); + } + } + + public static Serializer getSerializerByType(byte type) { + return SERIALIZERS_MAP.get(type); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/Serializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/Serializer.java new file mode 100644 index 0000000000000000000000000000000000000000..f16d951e4eaeb8fb080eb84cf2f28e72e34c2bcb --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/Serializer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.rpc.serializer; + +import java.io.IOException; + +public interface Serializer { + + byte[] serialize(T obj) throws IOException; + + T deserialize(byte[] data, Class clz) throws IOException; + +}