提交 ea3a4758 编写于 作者: 武汉红喜's avatar 武汉红喜

summer

上级 ce724df2
......@@ -4,4 +4,5 @@
- Add module `whatsmars-archetypes`
- Add more spring boot samples
- Add spring cloud gateway demo
- New whatsmars-rpc `summer`
- Remove some useless modules
\ No newline at end of file
......@@ -50,12 +50,16 @@
<disruptor.version>3.4.2</disruptor.version>
<dubbo.version>2.7.3</dubbo.version>
<fastjson.version>1.2.70</fastjson.version>
<fst.version>2.57</fst.version>
<guava.version>20.0</guava.version>
<hessian.version>4.0.7</hessian.version>
<hessian.version>4.0.38</hessian.version>
<javassist.version>3.24.1-GA</javassist.version>
<kryo.version>4.0.2</kryo.version>
<log4j.version>1.2.17</log4j.version>
<msgpack.version>0.6.12</msgpack.version>
<mybatis.version>3.4.5</mybatis.version>
<mybatis-spring.version>1.3.1</mybatis-spring.version>
<protobuf.version>3.2.0</protobuf.version>
<rocketmq.version>4.5.1</rocketmq.version>
<zkclient.version>0.9</zkclient.version>
</properties>
......@@ -144,6 +148,26 @@
<artifactId>hessian</artifactId>
<version>${hessian.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>${msgpack.version}</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>${fst.version}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
......
......@@ -51,10 +51,30 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
......
......@@ -106,6 +106,9 @@ broker会在一段时间后回查ProducerGroup里的其他实例,确认消息
需要一提的是,当消费者(不同jvm实例)都在同一台物理机上时,若指定instanceName,消费负载均衡将失效(每个实例都将消费所有消息)。
另外,在一个jvm里模拟集群消费时,必须指定不同的instanceName,否则启动时会提示ConsumerGroup已存在。
### RocketMQ Remoting
see remoting.md
### Q&A
#### Q1:分布式消息系统中,如何避免消息重复?
......
......@@ -15,9 +15,28 @@
<description>RPC探索</description>
<modules>
<module>whatsmars-remoting</module>
<module>whatsmars-serialization</module>
<module>summer-core</module>
<module>summer-transport-netty</module>
</modules>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -9,39 +9,28 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>whatsmars-serialization</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>Serialization of RPC</description>
<artifactId>summer-core</artifactId>
<dependencies>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.2.0</version>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.57</version>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
......
package org.hongxi.summer;
import org.hongxi.summer.codec.Codec;
import org.hongxi.summer.common.SummerConstants;
import org.hongxi.summer.common.util.ByteUtils;
import org.hongxi.summer.common.util.SummerFrameworkUtils;
import org.hongxi.summer.exception.SummerErrorMsgConstants;
import org.hongxi.summer.exception.SummerFrameworkException;
import org.hongxi.summer.protocol.summer.SummerCodec;
import org.hongxi.summer.rpc.Request;
import org.hongxi.summer.rpc.Response;
import org.hongxi.summer.transport.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Created by shenhongxi on 2020/7/25.
*/
public class CodecUtils {
private static final Logger logger = LoggerFactory.getLogger(CodecUtils.class);
public static byte[] encodeObjectToBytes(Channel channel, Codec codec, Object msg) {
try {
byte[] data = encodeMessage(channel, codec, msg);
short type = ByteUtils.bytes2short(data, 0);
if (type == SummerCodec.MAGIC) {
return encodeV1(msg, data);
} else {
throw new SummerFrameworkException("can not encode message, unknown magic:" + type);
}
} catch (IOException e) {
throw new SummerFrameworkException("encode error: isResponse=" + (msg instanceof Response), e,
SummerErrorMsgConstants.FRAMEWORK_ENCODE_ERROR);
}
}
private static byte[] encodeV1(Object msg, byte[] data) throws IOException {
long requestId = getRequestId(msg);
byte[] result = new byte[SummerCodec.HEADER_LENGTH + data.length];
ByteUtils.short2bytes(SummerConstants.NETTY_MAGIC_TYPE, result, 0);
result[3] = getType(msg);
ByteUtils.long2bytes(requestId, result, 4);
ByteUtils.int2bytes(data.length, result, 12);
System.arraycopy(data, 0, result, SummerCodec.HEADER_LENGTH, data.length);
return result;
}
private static byte[] encodeMessage(Channel channel, Codec codec, Object msg) throws IOException {
byte[] data;
if (msg instanceof Response) {
try {
data = codec.encode(channel, msg);
} catch (Exception e) {
logger.error("NettyEncoder encode error, identity=" + channel.getUrl().getIdentity(), e);
Response oriResponse = (Response) msg;
Response response = SummerFrameworkUtils.buildErrorResponse(oriResponse.getRequestId(), e);
data = codec.encode(channel, response);
}
} else {
data = codec.encode(channel, msg);
}
return data;
}
private static long getRequestId(Object message) {
if (message instanceof Request) {
return ((Request) message).getRequestId();
} else if (message instanceof Response) {
return ((Response) message).getRequestId();
} else {
return 0;
}
}
private static byte getType(Object message) {
if (message instanceof Request) {
return SummerConstants.FLAG_REQUEST;
} else if (message instanceof Response) {
return SummerConstants.FLAG_RESPONSE;
} else {
return SummerConstants.FLAG_OTHER;
}
}
}
package org.hongxi.summer.codec;
import org.apache.commons.lang3.StringUtils;
import org.hongxi.summer.common.extension.ExtensionLoader;
import org.hongxi.summer.exception.SummerErrorMsgConstants;
import org.hongxi.summer.exception.SummerFrameworkException;
import org.hongxi.summer.exception.SummerServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Created by shenhongxi on 2020/7/25.
*/
public abstract class AbstractCodec implements Codec {
private static final Logger logger = LoggerFactory.getLogger(AbstractCodec.class);
protected static ConcurrentMap<Integer, String> serializations;
protected void serialize(ObjectOutput output, Object message, Serialization serialization) throws IOException {
if (message == null) {
output.writeObject(null);
return;
}
output.writeObject(serialization.serialize(message));
}
protected Object deserialize(byte[] value, Class<?> type, Serialization serialization) throws IOException {
if (value == null) {
return null;
}
return serialization.deserialize(value, type);
}
public ObjectOutput createOutput(OutputStream out) {
try {
return new ObjectOutputStream(out);
} catch (Exception e) {
throw new SummerFrameworkException(this.getClass().getSimpleName() + " createOutput error", e,
SummerErrorMsgConstants.FRAMEWORK_ENCODE_ERROR);
}
}
public ObjectInput createInput(InputStream in) {
try {
return new ObjectInputStream(in);
} catch (Exception e) {
throw new SummerFrameworkException(this.getClass().getSimpleName() + " createInput error", e,
SummerErrorMsgConstants.FRAMEWORK_DECODE_ERROR);
}
}
protected static synchronized void initAllSerialization() {
if (serializations == null) {
serializations = new ConcurrentHashMap<>();
try {
ExtensionLoader<Serialization> loader = ExtensionLoader.getExtensionLoader(Serialization.class);
List<Serialization> exts = loader.getExtensions();
for (Serialization s : exts) {
String old = serializations.put(s.getSerializationNumber(), loader.getSpiName(s.getClass()));
if (old != null) {
logger.warn("conflict serialization spi! serialization num :{}, old spi :{}, new spi :{}",
s.getSerializationNumber(), old, serializations.get(s.getSerializationNumber()));
}
}
} catch (Exception e) {
logger.warn("init all serializations failed", e);
}
}
}
protected Serialization getSerializationByNum(int serializationNum) {
if (serializations == null) {
initAllSerialization();
}
String name = serializations.get(serializationNum);
Serialization s = null;
if (StringUtils.isNotBlank(name)) {
s = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
}
if (s == null) {
throw new SummerServiceException("can not find serialization by number " + serializationNum);
}
return s;
}
}
package org.hongxi.summer.codec;
import org.hongxi.summer.common.extension.Scope;
import org.hongxi.summer.common.extension.Spi;
import org.hongxi.summer.transport.Channel;
import java.io.IOException;
/**
* Created by shenhongxi on 2020/6/25.
*/
@Spi(scope = Scope.PROTOTYPE)
public interface Codec {
byte[] encode(Channel channel, Object message) throws IOException;
Object decode(Channel channel, String remoteIp, byte[] data) throws IOException;
}
package org.hongxi.summer.codec;
import org.hongxi.summer.common.extension.Scope;
import org.hongxi.summer.common.extension.Spi;
import java.io.IOException;
/**
* Created by shenhongxi on 2020/7/25.
*/
@Spi(scope = Scope.SINGLETON)
public interface Serialization {
byte[] serialize(Object obj) throws IOException;
<T> T deserialize(byte[] bytes, Class<T> clz) throws IOException;
byte[] serializeMulti(Object[] data) throws IOException;
Object[] deserializeMulti(byte[] data, Class<?>[] classes) throws IOException;
/**
* serializaion的唯一编号,用于传输协议中指定序列化方式。每种序列化的编号必须唯一。
* @return 由于编码规范限制,序列化方式最大支持32种,因此返回值必须在0-31之间。
*/
int getSerializationNumber();
}
package org.hongxi.summer.common;
/**
* Created by shenhongxi on 2020/6/25.
*/
public enum ChannelState {
UNINIT(0),
INIT(1),
ALIVE(2),
UNALIVE(3),
CLOSE(4);
private final int value;
ChannelState(int value) {
this.value = value;
}
public int value() {
return value;
}
public boolean isUnInitState() {
return this == UNINIT;
}
public boolean isInitState() {
return this == INIT;
}
public boolean isAliveState() {
return this == ALIVE;
}
public boolean isUnAliveState() {
return this == UNALIVE;
}
public boolean isCloseState() {
return this == CLOSE;
}
}
package org.hongxi.summer.common;
/**
* future task state
*
* Created by shenhongxi on 2020/8/23.
*
*/
public enum FutureState {
/** the task is doing **/
DOING(0),
/** the task is done **/
DONE(1),
/** ths task is cancelled **/
CANCELLED(2);
public final int value;
FutureState(int value) {
this.value = value;
}
public boolean isCancelledState() {
return this == CANCELLED;
}
public boolean isDoneState() {
return this == DONE;
}
public boolean isDoingState() {
return this == DOING;
}
}
package org.hongxi.summer.common;
import java.io.File;
/**
* Created by shenhongxi on 2020/6/26.
*/
public class SummerConstants {
public static final String FRAMEWORK_NAME = "org/hongxi/summer";
public static final String DEFAULT_CHARSET = "UTF-8";
public static final String PROTOCOL_SEPARATOR = "://";
public static final String PATH_SEPARATOR = File.separator;
public static final String NODE_TYPE_SERVICE = "service";
/**
* netty channel constants start
*/
public static final short NETTY_MAGIC_TYPE = (short) 0xF1F1;
public static final int NETTY_SHARE_CHANNEL_MIN_WORKER_THREADS = 40;
public static final int NETTY_SHARE_CHANNEL_MAX_WORKER_THREADS = 800;
public static final int NETTY_NOT_SHARE_CHANNEL_MIN_WORKER_THREADS = 20;
public static final int NETTY_NOT_SHARE_CHANNEL_MAX_WORKER_THREADS = 200;
public static final int NETTY_TIMEOUT_TIMER_PERIOD = 100;
public static final String ASYNC_SUFFIX = "Async";// suffix for async call.
public static final String DEFAULT_VERSION = "1.0";
// netty client max concurrent request TODO 2W is suitable?
public static final int NETTY_CLIENT_MAX_REQUEST = 20000;
// ------------------ summer protocol constants -----------------
public static final String SUMMER_GROUP = "S_g";
public static final String SUMMER_VERSION = "S_v";
public static final String SUMMER_PATH = "S_p";
public static final String SUMMER_METHOD = "S_m";
public static final String SUMMER_METHOD_DESC = "S_md";
public static final String SUMMER_AUTH = "S_a";
public static final String SUMMER_SOURCE = "S_s";// 调用方来源标识,等同与application
public static final String SUMMER_MODULE = "S_mdu";
public static final String SUMMER_PROXY_PROTOCOL = "S_pp";
public static final String SUMMER_INFO_SIGN = "S_is";
public static final String SUMMER_ERROR = "S_e";
public static final String SUMMER_PROCESS_TIME = "S_pt";
public static final String CONTENT_LENGTH = "Content-Length";
public static final String PROTOCOL_INJVM = "injvm";
public static final String PROTOCOL_SUMMER = "org/hongxi/summer";
public static final String METHOD_CONFIG_PREFIX = "methodconfig.";
public static final byte FLAG_REQUEST = 0x00;
public static final byte FLAG_RESPONSE = 0x01;
public static final byte FLAG_RESPONSE_VOID = 0x03;
public static final byte FLAG_RESPONSE_EXCEPTION = 0x05;
public static final byte FLAG_RESPONSE_ATTACHMENT = 0x07;
public static final byte FLAG_OTHER = (byte) 0xFF;
}
package org.hongxi.summer.common;
/**
* Created by shenhongxi on 2020/6/27.
*/
public enum URLParamType {
version("version", SummerConstants.DEFAULT_VERSION),
requestTimeout("requestTimeout", 200),
/** request id from http interface **/
requestIdFromClient("requestIdFromClient", 0),
connectTimeout("connectTimeout", 1000),
minWorkerThreads("minWorkerThreads", 20),
maxWorkerThreads("maxWorkerThreads", 200),
maxContentLength("maxContentLength", 10 * 1024 * 1024),
maxServerConnections("maxServerConnections", 100000),
minClientConnections("minClientConnections", 2),
protocol("protocol", SummerConstants.PROTOCOL_SUMMER),
path("path", ""),
host("host", ""),
port("port", 0),
/**
* multi referer share the same channel
*/
shareChannel("shareChannel", false),
asyncInitConnection("asyncInitConnection", false),
fusingThreshold("fusingThreshold", 10),
heartbeatFactory("heartbeatFactory", "org/hongxi/summer"),
/************************** SPI start ******************************/
serialization("serialization", "hessian2"),
codec("codec", "org/hongxi/summer"),
/************************** SPI end ******************************/
group("group", "default_rpc"),
nodeType("nodeType", SummerConstants.NODE_TYPE_SERVICE),
gzip("gzip", false), // 是否开启gzip压缩
minGzipSize("minGzipSize", 1000), // 进行gz压缩的最小数据大小。超过此阈值才进行gz压缩
application("application", SummerConstants.FRAMEWORK_NAME),
module("module", SummerConstants.FRAMEWORK_NAME),
workerQueueSize("workerQueueSize", 0);
private String name;
private String value;
private int intValue;
private long longValue;
private boolean boolValue;
URLParamType(String name, String value) {
this.name = name;
this.value = value;
}
URLParamType(String name, int intValue) {
this.name = name;
this.value = String.valueOf(intValue);
this.intValue = intValue;
}
URLParamType(String name, long longValue) {
this.name = name;
this.value = String.valueOf(longValue);
this.longValue = longValue;
}
URLParamType(String name, boolean boolValue) {
this.name = name;
this.value = String.valueOf(boolValue);
this.boolValue = boolValue;
}
public String getName() {
return name;
}
public String value() {
return value;
}
public int intValue() {
return intValue;
}
public long longValue() {
return longValue;
}
public boolean boolValue() {
return boolValue;
}
}
package org.hongxi.summer.common.extension;
import java.lang.annotation.*;
/**
* Spi有多个实现时,可以根据条件进行过滤、排序后再返回。
*
* Created by shenhongxi on 2020/7/25.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Activation {
/** seq号越小,在返回的list<Instance>中的位置越靠前,尽量使用 0-100以内的数字 */
int sequence() default 20;
/** spi 的key,获取spi列表时,根据key进行匹配,当key中存在待过滤的search-key时,匹配成功 */
String[] key() default "";
/** 是否支持重试的时候也调用 */
boolean retry() default true;
}
package org.hongxi.summer.common.extension;
import java.util.Comparator;
/**
* Created by shenhongxi on 2020/7/25.
*/
public class ActivationComparator<T> implements Comparator<T> {
/**
* sequence 大的排在后面,如果没有设置sequence的排到最前面
*/
@Override
public int compare(T o1, T o2) {
Activation p1 = o1.getClass().getAnnotation(Activation.class);
Activation p2 = o2.getClass().getAnnotation(Activation.class);
if (p1 == null) {
return 1;
} else if (p2 == null) {
return -1;
} else {
return p1.sequence() - p2.sequence();
}
}
}
package org.hongxi.summer.common.extension;
import org.apache.commons.lang3.StringUtils;
import org.hongxi.summer.common.SummerConstants;
import org.hongxi.summer.exception.SummerFrameworkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.net.URL;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Created by shenhongxi on 2020/6/25.
*/
public class ExtensionLoader<T> {
private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);
private static ConcurrentMap<Class<?>, ExtensionLoader<?>> extensionLoaders = new ConcurrentHashMap<>();
private ConcurrentMap<String, Class<T>> extensionClasses;
private ConcurrentMap<String, T> singletonInstances;
private Class<T> type;
private volatile boolean init;
private static final String SERVICES_DIRECTORY = "META-INF/services/";
private ClassLoader classLoader;
private ExtensionLoader(Class<T> type) {
this(type, Thread.currentThread().getContextClassLoader());
}
private ExtensionLoader(Class<T> type, ClassLoader classLoader) {
this.type = type;
this.classLoader = classLoader;
}
public T getExtension(String name) {
if (name == null) return null;
checkInit();
try {
Spi spi = type.getAnnotation(Spi.class);
if (spi.scope() == Scope.SINGLETON) {
return getSingletonInstance(name);
}
Class<T> clazz = extensionClasses.get(name);
if (clazz == null) {
return null;
}
return clazz.newInstance();
} catch (Exception e) {
throw new SummerFrameworkException(type.getName() + ": Error get extension " + name, e);
}
}
private T getSingletonInstance(String name) throws IllegalAccessException, InstantiationException {
T obj = singletonInstances.get(name);
if (obj != null) {
return obj;
}
Class<T> clazz = extensionClasses.get(name);
if (clazz == null) {
return null;
}
synchronized (singletonInstances) {
obj = singletonInstances.get(name);
if (obj != null) {
return obj;
}
obj = clazz.newInstance();
singletonInstances.put(name, obj);
}
return obj;
}
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
checkInterfaceType(type);
ExtensionLoader<T> loader = (ExtensionLoader<T>) extensionLoaders.get(type);
if (loader == null) {
loader = initExtensionLoader(type);
}
return loader;
}
private static <T> void checkInterfaceType(Class<T> clazz) {
if (!clazz.isInterface()) {
throw new SummerFrameworkException(clazz.getName() + ": Extension type is not interface");
}
if (!clazz.isAnnotationPresent(Spi.class)) {
throw new SummerFrameworkException(clazz.getName() + ": Extension type without @Spi annotation");
}
}
private static synchronized <T> ExtensionLoader<T> initExtensionLoader(Class<T> type) {
ExtensionLoader<T> loader = (ExtensionLoader<T>) extensionLoaders.get(type);
if (loader == null) {
loader = new ExtensionLoader<>(type);
extensionLoaders.put(type, loader);
}
return loader;
}
public List<T> getExtensions() {
return getExtensions(null);
}
/**
* 有些地方需要spi的所有激活的instances,所以需要能返回一个列表的方法
* 注意:1 SpiMeta 中的active 为true
* 2 按照spiMeta中的sequence进行排序
*
* @return
*/
public List<T> getExtensions(String key) {
checkInit();
if (extensionClasses.size() == 0) {
return Collections.emptyList();
}
// 如果只有一个实现,直接返回
List<T> exts = new ArrayList<T>(extensionClasses.size());
// 多个实现,按优先级排序返回
for (Map.Entry<String, Class<T>> entry : extensionClasses.entrySet()) {
Activation activation = entry.getValue().getAnnotation(Activation.class);
if (StringUtils.isBlank(key)) {
exts.add(getExtension(entry.getKey()));
} else if (activation != null && activation.key() != null) {
for (String k : activation.key()) {
if (key.equals(k)) {
exts.add(getExtension(entry.getKey()));
break;
}
}
}
}
Collections.sort(exts, new ActivationComparator<T>());
return exts;
}
private void checkInit() {
if (!init) {
loadExtensionClasses();
}
}
private synchronized void loadExtensionClasses() {
if (init) return;
extensionClasses = loadExtensionClasses(SERVICES_DIRECTORY);
singletonInstances = new ConcurrentHashMap<>();
init = true;
}
private ConcurrentMap<String, Class<T>> loadExtensionClasses(String dir) {
String fullName = dir + type.getName();
List<String> classNames = new ArrayList<>();
try {
Enumeration<URL> urls;
if (classLoader == null) {
urls = ClassLoader.getSystemResources(fullName);
} else {
urls = classLoader.getResources(fullName);
}
if (urls == null || !urls.hasMoreElements()) {
return new ConcurrentHashMap<>();
}
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
parseUrl(type, url, classNames);
}
} catch (Exception e) {
throw new SummerFrameworkException(
"ExtensionLoader loadExtensionClasses error, services dir: " + dir + ", type: " + type.getClass(), e);
}
return loadClasses(classNames);
}
private void parseUrl(Class<T> type, URL url, List<String> classNames) {
InputStream inputStream = null;
BufferedReader reader = null;
try {
inputStream = url.openStream();
reader = new BufferedReader(new InputStreamReader(inputStream, SummerConstants.DEFAULT_CHARSET));
String line;
int lineNumber = 0;
while ((line = reader.readLine()) != null) {
parseLine(type, url, line, ++lineNumber, classNames);
}
} catch (Exception e) {
logger.error("{}: Error reading spi configuration file", type.getName(), e);
} finally {
try {
if (reader != null) reader.close();
if (inputStream != null) inputStream.close();
} catch (IOException e) {
logger.error("{}: Error closing spi configuration file", type.getName(), e);
}
}
}
private void parseLine(Class<T> type, URL url, String line, int lineNumber, List<String> classNames) {
int ci = line.indexOf('#');
if (ci > 0) line = line.substring(0, ci);
line = line.trim();
if (line.isEmpty()) return;
if (line.indexOf(' ') >= 0 || line.indexOf('\t') >= 0) {
throw new SummerFrameworkException(type.getName() + ": " + url + ": " + lineNumber + ": Illegal spi configuration-file syntax");
}
int cp = line.codePointAt(0);
if (!Character.isJavaIdentifierStart(cp)) {
throw new SummerFrameworkException(type.getName() + ": " + url + ": " + lineNumber + ": Illegal spi provider-class name: " + line);
}
// for (int i = Character.charCount(cp); i < line.length(); i += Character.charCount(cp)) {
// cp = line.codePointAt(i);
// if (!Character.isJavaIdentifierStart(cp) && cp != '.') {
// throw new SummerFrameworkException(type.getName() + ": " + url + ": " + lineNumber + ": Illegal spi provider-class name: " + line);
// }
// }
if (!classNames.contains(line)) {
classNames.add(line);
}
}
private ConcurrentMap<String, Class<T>> loadClasses(List<String> classNames) {
ConcurrentMap<String, Class<T>> classes = new ConcurrentHashMap<>();
for (String className : classNames) {
try {
Class<T> clazz;
if (classLoader == null) {
clazz = (Class<T>) Class.forName(className);
} else {
clazz = (Class<T>) Class.forName(className, true, classLoader);
}
checkExtensionType(clazz);
String spiName = getSpiName(clazz);
if (classes.containsKey(spiName)) {
throw new SummerFrameworkException(clazz + ": spi name already exists: " + spiName);
} else {
classes.put(spiName, clazz);
}
} catch (Exception e) {
logger.error(type.getName() + ": Error load spi class", e);
}
}
return classes;
}
private void checkExtensionType(Class<T> clazz) {
checkClassPublic(clazz);
checkConstructorPublic(clazz);
checkClassInherit(clazz);
}
private void checkClassPublic(Class<T> clazz) {
if (!Modifier.isPublic(clazz.getModifiers())) {
throw new SummerFrameworkException(clazz.getName() + "is not a public class");
}
}
private void checkConstructorPublic(Class<T> clazz) {
Constructor<?>[] constructors = clazz.getConstructors();
if (constructors == null || constructors.length == 0) {
throw new SummerFrameworkException(clazz.getName() + "has no public no-args constructor");
}
for (Constructor<?> constructor : constructors) {
if (Modifier.isPublic(constructor.getModifiers()) && constructor.getParameterTypes().length == 0) {
return;
}
}
throw new SummerFrameworkException(clazz.getName() + "has no public no-args constructor");
}
private void checkClassInherit(Class<T> clazz) {
if (!type.isAssignableFrom(clazz)) {
throw new SummerFrameworkException(clazz.getName() + "is not instanceof " + type.getName());
}
}
public String getSpiName(Class<?> clazz) {
SpiMeta spiMeta = clazz.getAnnotation(SpiMeta.class);
return (spiMeta != null && !"".equals(spiMeta.name())) ? spiMeta.name() : clazz.getSimpleName();
}
}
package org.hongxi.summer.common.extension;
public enum Scope {
/**
* 单例模式
*/
SINGLETON,
/**
* 多例模式
*/
PROTOTYPE
}
\ No newline at end of file
package org.hongxi.summer.common.extension;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Spi {
Scope scope() default Scope.PROTOTYPE;
}
\ No newline at end of file
package org.hongxi.summer.common.extension;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SpiMeta {
String name() default "";
}
\ No newline at end of file
package org.hongxi.summer.common.threadpool;
import org.hongxi.summer.common.SummerConstants;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by shenhongxi on 2020/7/6.
*/
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup threadGroup;
private final AtomicInteger currentThreadNumber = new AtomicInteger(1);
private final String namePrefix;
private int priority = Thread.NORM_PRIORITY;
private boolean isDaemon = false;
public DefaultThreadFactory() {
this(SummerConstants.FRAMEWORK_NAME);
}
public DefaultThreadFactory(String prefix) {
this(prefix, false);
}
public DefaultThreadFactory(String prefix, boolean isDaemon) {
this(prefix, isDaemon, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(String prefix, boolean isDaemon, int priority) {
SecurityManager s = System.getSecurityManager();
this.threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = prefix + "-" + poolNumber.getAndIncrement() + "-thread-";
this.isDaemon = isDaemon;
this.priority = priority;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(threadGroup, r, namePrefix + currentThreadNumber.getAndIncrement(), 0);
thread.setDaemon(isDaemon);
thread.setPriority(priority);
return thread;
}
}
package org.hongxi.summer.common.threadpool;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionException;
/**
* LinkedTransferQueue 能保证更高性能,相比与LinkedBlockingQueue有明显提升
*
* <pre>
* 1) 不过LinkedTransferQueue的缺点是没有队列长度控制,需要在外层协助控制
* </pre>
*
* Created by shenhongxi on 2020/7/6.
*
*/
public class ExecutorQueue extends LinkedTransferQueue<Runnable> {
private static final long serialVersionUID = -3392627914941820087L;
private StandardThreadPoolExecutor threadPoolExecutor;
public ExecutorQueue() {
super();
}
public void setThreadPoolExecutor(StandardThreadPoolExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}
public boolean force(Runnable command) {
if (threadPoolExecutor.isShutdown()) {
throw new RejectedExecutionException("Executor not running, cannot force a task into the queue");
}
return super.offer(command);
}
public boolean offer(Runnable command) {
int poolSize = threadPoolExecutor.getPoolSize();
if (poolSize == threadPoolExecutor.getMaximumPoolSize()) {
return super.offer(command);
}
if (threadPoolExecutor.getSubmittedTasksCount() <= poolSize) {
return super.offer(command);
}
if (poolSize < threadPoolExecutor.getMaximumPoolSize()) {
return false;
}
return super.offer(command);
}
}
package org.hongxi.summer.common.threadpool;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <pre>
*
* 代码和思路主要来自于:
*
* tomcat :
* org.apache.catalina.core.StandardThreadExecutor
*
* java.util.concurrent
* threadPoolExecutor execute执行策略: 优先offer到queue,queue满后再扩充线程到maxThread,如果已经到了maxThread就reject
* 比较适合于CPU密集型应用(比如runnable内部执行的操作都在JVM内部,memory copy, or compute等等)
*
* StandardThreadExecutor execute执行策略: 优先扩充线程到maxThread,再offer到queue,如果满了就reject
* 比较适合于业务处理需要远程资源的场景
*
* </pre>
*
* Created by shenhongxi on 2020/6/27.
*/
public class StandardThreadPoolExecutor extends ThreadPoolExecutor {
public static final int DEFAULT_MIN_THREADS = 20;
public static final int DEFAULT_MAX_THREADS = 200;
public static final int DEFAULT_MAX_IDLE_TIME = 60 * 1000; // 1 min
protected AtomicInteger submittedTasksCount;
private int maxSubmittedTasks;
public StandardThreadPoolExecutor() {
this(DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS);
}
public StandardThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
this(corePoolSize, maximumPoolSize, maximumPoolSize);
}
public StandardThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, maximumPoolSize);
}
public StandardThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int queueCapacity) {
this(corePoolSize, maximumPoolSize, queueCapacity, Executors.defaultThreadFactory());
}
public StandardThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int queueCapacity, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, DEFAULT_MAX_IDLE_TIME, TimeUnit.MILLISECONDS, queueCapacity, threadFactory);
}
public StandardThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueCapacity) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, queueCapacity, Executors.defaultThreadFactory());
}
public StandardThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueCapacity, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, queueCapacity, threadFactory, new AbortPolicy());
}
public StandardThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueCapacity, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ExecutorQueue(), threadFactory, handler);
((ExecutorQueue) getQueue()).setThreadPoolExecutor(this);
submittedTasksCount = new AtomicInteger(0);
maxSubmittedTasks = maximumPoolSize + queueCapacity;
}
@Override
public void execute(Runnable command) {
int count = submittedTasksCount.incrementAndGet();
if (count > maxSubmittedTasks) {
submittedTasksCount.decrementAndGet();
getRejectedExecutionHandler().rejectedExecution(command, this);
}
try {
super.execute(command);
} catch (RejectedExecutionException e) {
if (!((ExecutorQueue) getQueue()).force(command)) {
submittedTasksCount.decrementAndGet();
getRejectedExecutionHandler().rejectedExecution(command, this);
}
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTasksCount.decrementAndGet();
}
public int getSubmittedTasksCount() {
return submittedTasksCount.get();
}
public int getMaxSubmittedTasks() {
return maxSubmittedTasks;
}
}
package org.hongxi.summer.common.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
/**
* Created by shenhongxi on 2020/7/25.
*/
public class ByteUtils {
/**
* 把byte数组中off开始的8个字节,转为long类型,高位在前
*
* @param bytes
* @param off
*/
public static long bytes2long(byte[] bytes, int off) {
return ((bytes[off + 7] & 0xFFL)) + ((bytes[off + 6] & 0xFFL) << 8) + ((bytes[off + 5] & 0xFFL) << 16)
+ ((bytes[off + 4] & 0xFFL) << 24) + ((bytes[off + 3] & 0xFFL) << 32) + ((bytes[off + 2] & 0xFFL) << 40)
+ ((bytes[off + 1] & 0xFFL) << 48) + (((long) bytes[off]) << 56);
}
/**
* 把long类型的value转为8个byte字节,放到byte数组的off开始的位置,高位在前
*
* @param value
* @param bytes
* @param off
*/
public static void long2bytes(long value, byte[] bytes, int off) {
bytes[off + 7] = (byte) value;
bytes[off + 6] = (byte) (value >>> 8);
bytes[off + 5] = (byte) (value >>> 16);
bytes[off + 4] = (byte) (value >>> 24);
bytes[off + 3] = (byte) (value >>> 32);
bytes[off + 2] = (byte) (value >>> 40);
bytes[off + 1] = (byte) (value >>> 48);
bytes[off] = (byte) (value >>> 56);
}
/**
* 把byte数组中off开始的4个字节,转为int类型,高位在前
*
* @param bytes
* @param off
*/
public static int bytes2int(byte[] bytes, int off) {
return ((bytes[off + 3] & 0xFF)) + ((bytes[off + 2] & 0xFF) << 8) + ((bytes[off + 1] & 0xFF) << 16) + ((bytes[off]) << 24);
}
/**
* 把int类型的value转为4个byte字节,放到byte数组的off开始的位置,高位在前
*
* @param value
* @param bytes
* @param off
*/
public static void int2bytes(int value, byte[] bytes, int off) {
bytes[off + 3] = (byte) value;
bytes[off + 2] = (byte) (value >>> 8);
bytes[off + 1] = (byte) (value >>> 16);
bytes[off] = (byte) (value >>> 24);
}
/**
* 把byte数组中off开始的2个字节,转为short类型,高位在前
*
* @param b
* @param off
*/
public static short bytes2short(byte[] b, int off) {
return (short) (((b[off + 1] & 0xFF)) + ((b[off] & 0xFF) << 8));
}
/**
* 把short类型的value转为2个byte字节,放到byte数组的off开始的位置,高位在前
*
* @param value
* @param bytes
* @param off
*/
public static void short2bytes(short value, byte[] bytes, int off) {
bytes[off + 1] = (byte) value;
bytes[off] = (byte) (value >>> 8);
}
public static byte[] gzip(byte[] data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length);
GZIPOutputStream gzip = null;
try {
gzip = new GZIPOutputStream(bos);
gzip.write(data);
gzip.finish();
return bos.toByteArray();
} finally {
if (gzip != null) {
gzip.close();
}
}
}
public static byte[] unGzip(byte[] data) throws IOException {
GZIPInputStream gzip = null;
try {
gzip = new GZIPInputStream(new ByteArrayInputStream(data));
byte[] buf = new byte[2048];
int size = -1;
ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length + 1024);
while ((size = gzip.read(buf, 0, buf.length)) != -1) {
bos.write(buf, 0, size);
}
return bos.toByteArray();
} finally {
if (gzip != null) {
gzip.close();
}
}
}
}
package org.hongxi.summer.common.util;
import java.util.Collection;
import java.util.Map;
/**
* Created by shenhongxi on 2020/7/30.
*/
public abstract class CollectionUtils {
/**
* Return {@code true} if the supplied Collection is {@code null} or empty.
* Otherwise, return {@code false}.
* @param collection the Collection to check
* @return whether the given Collection is empty
*/
public static boolean isEmpty(Collection<?> collection) {
return (collection == null || collection.isEmpty());
}
/**
* Return {@code true} if the supplied Map is {@code null} or empty.
* Otherwise, return {@code false}.
* @param map the Map to check
* @return whether the given Map is empty
*/
public static boolean isEmpty(Map<?, ?> map) {
return (map == null || map.isEmpty());
}
}
package org.hongxi.summer.common.util;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.hongxi.summer.exception.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by shenhongxi on 2020/7/26.
*/
public class ExceptionUtils {
private static final Logger logger = LoggerFactory.getLogger(ExceptionUtils.class);
public static final StackTraceElement[] REMOTE_MOCK_STACK = new StackTraceElement[]{
new StackTraceElement("remoteClass", "remoteMethod", "remoteFile", 1)};
/**
* 判定是否是业务方的逻辑抛出的异常
* <p>
* <pre>
* true: 来自业务方的异常
* false: 来自框架本身的异常
* </pre>
*
* @param t
* @return
*/
public static boolean isBizException(Throwable t) {
return t instanceof SummerBizException;
}
/**
* 是否框架包装过的异常
*
* @param t
* @return
*/
public static boolean isSummerException(Throwable t) {
return t instanceof SummerAbstractException;
}
public static String toMessage(Exception e) {
JSONObject jsonObject = new JSONObject();
int type = 1;
int code = 500;
String errmsg = null;
if (e instanceof SummerFrameworkException) {
SummerFrameworkException sfe = (SummerFrameworkException) e;
type = 0;
code = sfe.getErrorCode();
errmsg = sfe.getOriginMessage();
} else if (e instanceof SummerServiceException) {
SummerServiceException mse = (SummerServiceException) e;
type = 1;
code = mse.getErrorCode();
errmsg = mse.getOriginMessage();
} else if (e instanceof SummerBizException) {
SummerBizException sbe = (SummerBizException) e;
type = 2;
code = sbe.getErrorCode();
errmsg = sbe.getOriginMessage();
if (sbe.getCause() != null) {
errmsg = errmsg + ", cause:" + sbe.getCause().getMessage();
}
} else {
errmsg = e.getMessage();
}
jsonObject.put("errcode", code);
jsonObject.put("errmsg", errmsg);
jsonObject.put("errtype", type);
return jsonObject.toString();
}
public static SummerAbstractException fromMessage(String msg) {
if (StringUtils.isNotBlank(msg)) {
try {
JSONObject jsonObject = JSONObject.parseObject(msg);
int type = jsonObject.getIntValue("errtype");
int errcode = jsonObject.getIntValue("errcode");
String errmsg = jsonObject.getString("errmsg");
SummerAbstractException e = null;
switch (type) {
case 1:
e = new SummerServiceException(errmsg, new SummerErrorMsg(errcode, errcode, errmsg));
break;
case 2:
e = new SummerBizException(errmsg, new SummerErrorMsg(errcode, errcode, errmsg));
break;
default:
e = new SummerFrameworkException(errmsg, new SummerErrorMsg(errcode, errcode, errmsg));
}
return e;
} catch (Exception e) {
logger.warn("build exception from msg fail. msg:{}", msg);
}
}
return null;
}
/**
* 覆盖给定exception的stack信息,server端产生业务异常时调用此类屏蔽掉server端的异常栈。
*
* @param e
*/
public static void setMockStackTrace(Throwable e) {
if (e != null) {
try {
e.setStackTrace(REMOTE_MOCK_STACK);
} catch (Exception e1) {
logger.warn("replace remote exception stack fail! {}", e1.getMessage());
}
}
}
}
package org.hongxi.summer.common.util;
/**
* Created by shenhongxi on 2020/7/28.
*/
public class MathUtils {
/**
* 针对int类型字符串进行解析,如果存在格式错误,则返回默认值(defaultValue)
* Parse intStr, return defaultValue when numberFormatException occurs
* @param intStr
* @param defaultValue
* @return
*/
public static int parseInt(String intStr, int defaultValue) {
try {
return Integer.parseInt(intStr);
} catch (NumberFormatException e) {
return defaultValue;
}
}
/**
* 针对long类型字符串进行解析,如果存在格式错误,则返回默认值(defaultValue)
* Parse longStr, return defaultValue when numberFormatException occurs
* @param longStr
* @param defaultValue
* @return
*/
public static long parseLong(String longStr, long defaultValue){
try {
return Long.parseLong(longStr);
} catch (NumberFormatException e) {
return defaultValue;
}
}
/**
* 通过二进制位操作将originValue转化为非负数:
* 0和正数返回本身
* 负数通过二进制首位取反转化为正数或0(Integer.MIN_VALUE将转换为0)
* return non-negative int value of originValue
* @param originValue
* @return positive int
*/
public static int getNonNegative(int originValue){
return 0x7fffffff & originValue;
}
/**
* 通过二进制位操作将originValue转化为非负数:
* 范围在[0-16777215] 之间
*
* @param originValue
* @return
*/
public static int getNonNegativeRange24bit(int originValue) {
return 0x00ffffff & originValue;
}
}
package org.hongxi.summer.common.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.*;
import java.util.Enumeration;
import java.util.Map;
import java.util.regex.Pattern;
/**
*
* 网络工具类
*
* Created by shenhongxi on 2020/8/22.
*/
public class NetUtils {
private static final Logger logger = LoggerFactory.getLogger(NetUtils.class);
public static final String LOCALHOST = "127.0.0.1";
public static final String ANYHOST = "0.0.0.0";
private static volatile InetAddress LOCAL_ADDRESS = null;
private static final Pattern LOCAL_IP_PATTERN = Pattern.compile("127(\\.\\d{1,3}){3}$");
private static final Pattern ADDRESS_PATTERN = Pattern.compile("^\\d{1,3}(\\.\\d{1,3}){3}\\:\\d{1,5}$");
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
public static boolean isInvalidLocalHost(String host) {
return host == null || host.length() == 0 || host.equalsIgnoreCase("localhost") || host.equals("0.0.0.0")
|| (LOCAL_IP_PATTERN.matcher(host).matches());
}
public static boolean isValidLocalHost(String host) {
return !isInvalidLocalHost(host);
}
/**
* {@link #getLocalAddress(Map)}
*
* @return
*/
public static InetAddress getLocalAddress() {
return getLocalAddress(null);
}
/**
* <pre>
* 查找策略:首先看是否已经查到ip --> hostname对应的ip --> 根据连接目标端口得到的本地ip --> 轮询网卡
* </pre>
*
* @return loca ip
*/
public static InetAddress getLocalAddress(Map<String, Integer> destHostPorts) {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = getLocalAddressByHostname();
if (!isValidAddress(localAddress)) {
localAddress = getLocalAddressBySocket(destHostPorts);
}
if (!isValidAddress(localAddress)) {
localAddress = getLocalAddressByNetworkInterface();
}
if (isValidAddress(localAddress)) {
LOCAL_ADDRESS = localAddress;
}
return localAddress;
}
private static InetAddress getLocalAddressByHostname() {
try {
InetAddress localAddress = InetAddress.getLocalHost();
if (isValidAddress(localAddress)) {
return localAddress;
}
} catch (Throwable e) {
logger.warn("Failed to retriving local address by hostname:" + e);
}
return null;
}
private static InetAddress getLocalAddressBySocket(Map<String, Integer> destHostPorts) {
if (destHostPorts == null || destHostPorts.size() == 0) {
return null;
}
for (Map.Entry<String, Integer> entry : destHostPorts.entrySet()) {
String host = entry.getKey();
int port = entry.getValue();
try {
Socket socket = new Socket();
try {
SocketAddress addr = new InetSocketAddress(host, port);
socket.connect(addr, 1000);
return socket.getLocalAddress();
} finally {
try {
socket.close();
} catch (Throwable e) {}
}
} catch (Exception e) {
logger.warn("Failed to retriving local address by connecting to dest host:port({}:{}) false",
host, port, e);
}
}
return null;
}
private static InetAddress getLocalAddressByNetworkInterface() {
try {
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
if (interfaces != null) {
while (interfaces.hasMoreElements()) {
try {
NetworkInterface network = interfaces.nextElement();
Enumeration<InetAddress> addresses = network.getInetAddresses();
while (addresses.hasMoreElements()) {
try {
InetAddress address = addresses.nextElement();
if (isValidAddress(address)) {
return address;
}
} catch (Throwable e) {
logger.warn("Failed to retriving ip address", e);
}
}
} catch (Throwable e) {
logger.warn("Failed to retriving ip address", e);
}
}
}
} catch (Throwable e) {
logger.warn("Failed to retriving ip address", e);
}
return null;
}
public static boolean isValidAddress(String address) {
return ADDRESS_PATTERN.matcher(address).matches();
}
public static boolean isValidAddress(InetAddress address) {
if (address == null || address.isLoopbackAddress()) return false;
String name = address.getHostAddress();
return (name != null && !ANYHOST.equals(name) && !LOCALHOST.equals(name) && IP_PATTERN.matcher(name).matches());
}
//return ip to avoid lookup dns
public static String getHostName(SocketAddress socketAddress) {
if (socketAddress == null) {
return null;
}
if (socketAddress instanceof InetSocketAddress) {
InetAddress addr = ((InetSocketAddress) socketAddress).getAddress();
if(addr != null){
return addr.getHostAddress();
}
}
return null;
}
}
package org.hongxi.summer.common.util;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 反射相关的辅助类
*
* @author maijunsheng
* @version 创建时间:2013-5-23
*
*/
public class ReflectUtils {
public static final String PARAM_CLASS_SPLIT = ",";
public static final String EMPTY_PARAM = "void";
private static final Class<?>[] EMPTY_CLASS_ARRAY = new Class<?>[0];
private static final ConcurrentMap<String, Class<?>> name2ClassCache = new ConcurrentHashMap<String, Class<?>>();
private static final ConcurrentMap<Class<?>, String> class2NameCache = new ConcurrentHashMap<Class<?>, String>();
private static final String[] PRIMITIVE_NAMES = new String[] {"boolean", "byte", "char", "double", "float", "int", "long", "short",
"void"};
private static final Class<?>[] PRIMITIVE_CLASSES = new Class[] {boolean.class, byte.class, char.class, double.class, float.class,
int.class, long.class, short.class, Void.TYPE};
private static final int PRIMITIVE_CLASS_NAME_MAX_LENGTH = 7;
/**
* 获取method方式的接口参数,以逗号分割,拼接clz列表。 如果没有参数,那么void表示
*
* @param method
* @return
*/
public static String getMethodParamDesc(Method method) {
if (method.getParameterTypes() == null || method.getParameterTypes().length == 0) {
return EMPTY_PARAM;
}
StringBuilder builder = new StringBuilder();
Class<?>[] clzs = method.getParameterTypes();
for (Class<?> clz : clzs) {
String className = getName(clz);
builder.append(className).append(PARAM_CLASS_SPLIT);
}
return builder.substring(0, builder.length() - 1);
}
/**
* 获取方法的标示 : method_name + "(" + paramDesc + ")"
*
* @param method
* @return
*/
public static String getMethodDesc(Method method) {
String methodParamDesc = getMethodParamDesc(method);
return getMethodDesc(method.getName(), methodParamDesc);
}
/**
* 获取方法的标示 : method_name + "(" + paramDesc + ")"
*
* @param
* @return
*/
public static String getMethodDesc(String methodName, String paramDesc) {
if (paramDesc == null) {
return methodName + "()";
} else {
return methodName + "(" + paramDesc + ")";
}
}
public static Class<?>[] forNames(String classList) throws ClassNotFoundException {
if (classList == null || "".equals(classList) || EMPTY_PARAM.equals(classList)) {
return EMPTY_CLASS_ARRAY;
}
String[] classNames = classList.split(PARAM_CLASS_SPLIT);
Class<?>[] classTypes = new Class<?>[classNames.length];
for (int i = 0; i < classNames.length; i++) {
String className = classNames[i];
classTypes[i] = forName(className);
}
return classTypes;
}
public static Class<?> forName(String className) throws ClassNotFoundException {
if (null == className || "".equals(className)) {
return null;
}
Class<?> clz = name2ClassCache.get(className);
if (clz != null) {
return clz;
}
clz = forNameWithoutCache(className);
// 应该没有内存消耗过多的可能,除非有些代码很恶心,创建特别多的类
name2ClassCache.putIfAbsent(className, clz);
return clz;
}
private static Class<?> forNameWithoutCache(String className) throws ClassNotFoundException {
if (!className.endsWith("[]")) { // not array
Class<?> clz = getPrimitiveClass(className);
clz = (clz != null) ? clz : Class.forName(className, true, Thread.currentThread().getContextClassLoader());
return clz;
}
int dimensionSiz = 0;
while (className.endsWith("[]")) {
dimensionSiz++;
className = className.substring(0, className.length() - 2);
}
int[] dimensions = new int[dimensionSiz];
Class<?> clz = getPrimitiveClass(className);
if (clz == null) {
clz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
}
return Array.newInstance(clz, dimensions).getClass();
}
/**
* 需要支持一维数组、二维数组等
*
* @param
* @return
*/
public static String getName(Class<?> clz) {
if (clz == null) {
return null;
}
String className = class2NameCache.get(clz);
if (className != null) {
return className;
}
className = getNameWithoutCache(clz);
// 与name2ClassCache同样道理,如果没有恶心的代码,这块内存大小应该可控
class2NameCache.putIfAbsent(clz, className);
return className;
}
private static String getNameWithoutCache(Class<?> clz) {
if (!clz.isArray()) {
return clz.getName();
}
StringBuilder sb = new StringBuilder();
while (clz.isArray()) {
sb.append("[]");
clz = clz.getComponentType();
}
return clz.getName() + sb.toString();
}
public static Class<?> getPrimitiveClass(String name) {
// check if is primitive class
if (name.length() <= PRIMITIVE_CLASS_NAME_MAX_LENGTH) {
int index = Arrays.binarySearch(PRIMITIVE_NAMES, name);
if (index >= 0) {
return PRIMITIVE_CLASSES[index];
}
}
return null;
}
/**
* 获取clz public method
*
* <pre>
* 1)不包含构造函数
* 2)不包含Object.class
* 3)包含该clz的父类的所有public方法
* </pre>
*
* @param clz
* @return
*/
public static List<Method> getPublicMethod(Class<?> clz) {
Method[] methods = clz.getMethods();
List<Method> ret = new ArrayList<Method>();
for (Method method : methods) {
boolean isPublic = Modifier.isPublic(method.getModifiers());
boolean isNotObjectClass = method.getDeclaringClass() != Object.class;
if (isPublic && isNotObjectClass) {
ret.add(method);
}
}
return ret;
}
public static Object getEmptyObject(Class<?> returnType) {
return getEmptyObject(returnType, new HashMap<Class<?>, Object>(), 0);
}
private static Object getEmptyObject(Class<?> returnType, Map<Class<?>, Object> emptyInstances, int level) {
if (level > 2) return null;
if (returnType == null) {
return null;
} else if (returnType == boolean.class || returnType == Boolean.class) {
return false;
} else if (returnType == char.class || returnType == Character.class) {
return '\0';
} else if (returnType == byte.class || returnType == Byte.class) {
return (byte) 0;
} else if (returnType == short.class || returnType == Short.class) {
return (short) 0;
} else if (returnType == int.class || returnType == Integer.class) {
return 0;
} else if (returnType == long.class || returnType == Long.class) {
return 0L;
} else if (returnType == float.class || returnType == Float.class) {
return 0F;
} else if (returnType == double.class || returnType == Double.class) {
return 0D;
} else if (returnType.isArray()) {
return Array.newInstance(returnType.getComponentType(), 0);
} else if (returnType.isAssignableFrom(ArrayList.class)) {
return new ArrayList<Object>(0);
} else if (returnType.isAssignableFrom(HashSet.class)) {
return new HashSet<Object>(0);
} else if (returnType.isAssignableFrom(HashMap.class)) {
return new HashMap<Object, Object>(0);
} else if (String.class.equals(returnType)) {
return "";
} else if (!returnType.isInterface()) {
try {
Object value = emptyInstances.get(returnType);
if (value == null) {
value = returnType.newInstance();
emptyInstances.put(returnType, value);
}
Class<?> cls = value.getClass();
while (cls != null && cls != Object.class) {
Field[] fields = cls.getDeclaredFields();
for (Field field : fields) {
Object property = getEmptyObject(field.getType(), emptyInstances, level + 1);
if (property != null) {
try {
if (!field.isAccessible()) {
field.setAccessible(true);
}
field.set(value, property);
} catch (Throwable e) {}
}
}
cls = cls.getSuperclass();
}
return value;
} catch (Throwable e) {
return null;
}
} else {
return null;
}
}
}
package org.hongxi.summer.common.util;
import java.util.concurrent.atomic.AtomicLong;
/**
* 通过requestId能够知道大致请求的时间
*
* <pre>
* 目前是 currentTimeMillis * (2^20) + offset.incrementAndGet()
*
* 通过 requestId / (2^20 * 1000) 能够得到秒
*
* </pre>
*
* Created by shenhongxi on 2020/8/22.
*
*/
public class RequestIdGenerator {
protected static final AtomicLong offset = new AtomicLong(0);
protected static final int BITS = 20;
protected static final long MAX_COUNT_PER_MILLIS = 1 << BITS;
/**
* 获取 requestId
*
* @return
*/
public static long getRequestId() {
long currentTime = System.currentTimeMillis();
long count = offset.incrementAndGet();
while(count >= MAX_COUNT_PER_MILLIS){
synchronized (RequestIdGenerator.class){
if(offset.get() >= MAX_COUNT_PER_MILLIS){
offset.set(0);
}
}
count = offset.incrementAndGet();
}
return (currentTime << BITS) + count;
}
}
package org.hongxi.summer.common.util;
import org.apache.commons.lang3.StringUtils;
import org.hongxi.summer.common.SummerConstants;
import org.hongxi.summer.common.URLParamType;
import org.hongxi.summer.rpc.DefaultResponse;
import org.hongxi.summer.rpc.Request;
import org.hongxi.summer.rpc.URL;
/**
* Created by shenhongxi on 2020/7/25.
*/
public class SummerFrameworkUtils {
public static String removeAsyncSuffix(String path) {
if (path != null && path.endsWith(SummerConstants.ASYNC_SUFFIX)) {
return path.substring(0, path.length() - SummerConstants.ASYNC_SUFFIX.length());
}
return path;
}
public static DefaultResponse buildErrorResponse(Request request, Exception e) {
return buildErrorResponse(request.getRequestId(), e);
}
public static DefaultResponse buildErrorResponse(long requestId, Exception e) {
DefaultResponse response = new DefaultResponse();
response.setRequestId(requestId);
response.setException(e);
return response;
}
/**
* protocol key: protocol://host:port/group/interface/version
*
* @param url
* @return
*/
public static String getProtocolKey(URL url) {
StringBuilder key = new StringBuilder();
key.append(url.getProtocol());
key.append(SummerConstants.PROTOCOL_SEPARATOR);
key.append(url.getServerPortStr());
key.append(SummerConstants.PATH_SEPARATOR);
key.append(url.getGroup());
key.append(SummerConstants.PATH_SEPARATOR);
key.append(url.getPath());
key.append(SummerConstants.PATH_SEPARATOR);
key.append(url.getVersion());
return key.toString();
}
/**
* 判断url:source和url:target是否可以使用共享的service channel(port) 对外提供服务
* <p>
* <pre>
* 1) protocol
* 2) codec
* 3) serialize
* 4) maxContentLength
* 5) maxServerConnection
* 6) maxWorkerThread
* 7) workerQueueSize
* 8) heartbeatFactory
* </pre>
*
* @param source
* @param target
* @return
*/
public static boolean checkIfCanShareServiceChannel(URL source, URL target) {
if (!StringUtils.equals(source.getProtocol(), target.getProtocol())) {
return false;
}
if (!StringUtils.equals(source.getParameter(URLParamType.codec.getName()),
target.getParameter(URLParamType.codec.getName()))) {
return false;
}
if (!StringUtils.equals(source.getParameter(URLParamType.serialization.getName()),
target.getParameter(URLParamType.serialization.getName()))) {
return false;
}
if (!StringUtils.equals(source.getParameter(URLParamType.maxContentLength.getName()),
target.getParameter(URLParamType.maxContentLength.getName()))) {
return false;
}
if (!StringUtils.equals(source.getParameter(URLParamType.maxServerConnections.getName()),
target.getParameter(URLParamType.maxServerConnections.getName()))) {
return false;
}
if (!StringUtils.equals(source.getParameter(URLParamType.maxWorkerThreads.getName()),
target.getParameter(URLParamType.maxWorkerThreads.getName()))) {
return false;
}
if (!StringUtils.equals(source.getParameter(URLParamType.workerQueueSize.getName()),
target.getParameter(URLParamType.workerQueueSize.getName()))) {
return false;
}
return StringUtils.equals(source.getParameter(URLParamType.heartbeatFactory.getName()),
target.getParameter(URLParamType.heartbeatFactory.getName()));
}
/**
* 输出请求的关键信息: requestId=** interface=** method=**(**)
*
* @param request
* @return
*/
public static String toString(Request request) {
return "requestId=" + request.getRequestId() +
" interface=" + request.getInterfaceName() +
" method=" + request.getMethodName()
+ "(" + request.getParametersDesc() + ")";
}
}
package org.hongxi.summer.exception;
import org.hongxi.summer.rpc.RpcContext;
/**
* Created by shenhongxi on 2020/6/26.
*/
public abstract class SummerAbstractException extends RuntimeException {
private static final long serialVersionUID = -6842400415484759967L;
protected SummerErrorMsg summerErrorMsg = SummerErrorMsgConstants.FRAMEWORK_DEFAULT_ERROR;
protected String errorMsg;
public SummerAbstractException() {
super();
}
public SummerAbstractException(SummerErrorMsg summerErrorMsg) {
super();
this.summerErrorMsg = summerErrorMsg;
}
public SummerAbstractException(String message) {
super(message);
this.errorMsg = message;
}
public SummerAbstractException(String message, SummerErrorMsg summerErrorMsg) {
super(message);
this.summerErrorMsg = summerErrorMsg;
this.errorMsg = message;
}
public SummerAbstractException(String message, Throwable cause) {
super(message, cause);
this.errorMsg = message;
}
public SummerAbstractException(String message, Throwable cause, SummerErrorMsg summerErrorMsg) {
super(message, cause);
this.summerErrorMsg = summerErrorMsg;
this.errorMsg = message;
}
public SummerAbstractException(Throwable cause) {
super(cause);
}
public SummerAbstractException(Throwable cause, SummerErrorMsg summerErrorMsg) {
super(cause);
this.summerErrorMsg = summerErrorMsg;
}
@Override
public String getMessage() {
String message = getOriginMessage();
return String.format("error_message: %s, status: %d, error_code: %d, request_id: %s",
message, getStatus(), getErrorCode(), RpcContext.getContext().getRequestId());
}
public String getOriginMessage() {
if (summerErrorMsg == null) return super.getMessage();
if (errorMsg != null && !errorMsg.equals("")) {
return errorMsg;
}
return summerErrorMsg.getMessage();
}
public int getStatus() {
return summerErrorMsg != null ? summerErrorMsg.getStatus() : 0;
}
public int getErrorCode() {
return summerErrorMsg != null ? summerErrorMsg.getErrorCode() : 0;
}
public SummerErrorMsg getSummerErrorMsg() {
return summerErrorMsg;
}
}
package org.hongxi.summer.exception;
/**
* Created by shenhongxi on 2020/7/26.
*/
public class SummerBizException extends SummerAbstractException {
private static final long serialVersionUID = -9030222846555573201L;
public SummerBizException() {
super(SummerErrorMsgConstants.BIZ_DEFAULT_EXCEPTION);
}
public SummerBizException(SummerErrorMsg summerErrorMsg) {
super(summerErrorMsg);
}
public SummerBizException(String message) {
super(message, SummerErrorMsgConstants.BIZ_DEFAULT_EXCEPTION);
}
public SummerBizException(String message, SummerErrorMsg summerErrorMsg) {
super(message, summerErrorMsg);
}
public SummerBizException(String message, Throwable cause) {
super(message, cause, SummerErrorMsgConstants.BIZ_DEFAULT_EXCEPTION);
}
public SummerBizException(String message, Throwable cause, SummerErrorMsg summerErrorMsg) {
super(message, cause, summerErrorMsg);
}
public SummerBizException(Throwable cause) {
super(cause, SummerErrorMsgConstants.BIZ_DEFAULT_EXCEPTION);
}
public SummerBizException(Throwable cause, SummerErrorMsg summerErrorMsg) {
super(cause, summerErrorMsg);
}
}
package org.hongxi.summer.exception;
import java.io.Serializable;
/**
* Created by shenhongxi on 2020/6/26.
*/
public class SummerErrorMsg implements Serializable {
private static final long serialVersionUID = -5483348908144912517L;
private int status;
private int errorCode;
private String message;
public SummerErrorMsg(int status, int errorCode, String message) {
this.status = status;
this.errorCode = errorCode;
this.message = message;
}
public int getStatus() {
return status;
}
public int getErrorCode() {
return errorCode;
}
public String getMessage() {
return message;
}
}
package org.hongxi.summer.exception;
/**
* Created by shenhongxi on 2020/6/26.
*/
public class SummerErrorMsgConstants {
// service error status 503
public static final int SERVICE_DEFAULT_ERROR_CODE = 10001;
public static final int SERVICE_REJECT_ERROR_CODE = 10002;
public static final int SERVICE_TIMEOUT_ERROR_CODE = 10003;
public static final int SERVICE_TASK_CANCEL_ERROR_CODE = 10004;
// service error status 404
public static final int SERVICE_NOT_FOUND_ERROR_CODE = 10101;
// service error status 403
public static final int SERVICE_REQUEST_LENGTH_OUT_OF_LIMIT_ERROR_CODE = 10201;
// service error start
public static final SummerErrorMsg SERVICE_DEFAULT_ERROR =
new SummerErrorMsg(503, SERVICE_DEFAULT_ERROR_CODE, "service error");
public static final SummerErrorMsg SERVICE_REJECT =
new SummerErrorMsg(503, SERVICE_REJECT_ERROR_CODE, "service reject");
public static final SummerErrorMsg SERVICE_NOT_FOUND =
new SummerErrorMsg(404, SERVICE_NOT_FOUND_ERROR_CODE, "service not found");
public static final SummerErrorMsg SERVICE_TIMEOUT =
new SummerErrorMsg(503, SERVICE_TIMEOUT_ERROR_CODE, "service request timeout");
// framework error
public static final int FRAMEWORK_DEFAULT_ERROR_CODE = 20001;
public static final int FRAMEWORK_ENCODE_ERROR_CODE = 20002;
public static final int FRAMEWORK_DECODE_ERROR_CODE = 20003;
public static final int FRAMEWORK_INIT_ERROR_CODE = 20004;
public static final int FRAMEWORK_EXPORT_ERROR_CODE = 20005;
// biz error
public static final int BIZ_DEFAULT_ERROR_CODE = 30001;
public static final SummerErrorMsg FRAMEWORK_DEFAULT_ERROR =
new SummerErrorMsg(503, FRAMEWORK_DEFAULT_ERROR_CODE, "framework default error");
public static final SummerErrorMsg FRAMEWORK_ENCODE_ERROR =
new SummerErrorMsg(503, FRAMEWORK_ENCODE_ERROR_CODE, "framework encode error");
public static final SummerErrorMsg FRAMEWORK_DECODE_ERROR =
new SummerErrorMsg(503, FRAMEWORK_DECODE_ERROR_CODE, "framework decode error");
public static final SummerErrorMsg FRAMEWORK_INIT_ERROR =
new SummerErrorMsg(500, FRAMEWORK_INIT_ERROR_CODE, "framework init error");
public static final SummerErrorMsg FRAMEWORK_EXPORT_ERROR =
new SummerErrorMsg(503, FRAMEWORK_EXPORT_ERROR_CODE, "framework export error");
public static final SummerErrorMsg BIZ_DEFAULT_EXCEPTION =
new SummerErrorMsg(503, BIZ_DEFAULT_ERROR_CODE, "provider error");
}
package org.hongxi.summer.exception;
/**
* Created by shenhongxi on 2020/6/26.
*/
public class SummerFrameworkException extends SummerAbstractException {
private static final long serialVersionUID = -6860263607854518306L;
public SummerFrameworkException() {
super(SummerErrorMsgConstants.FRAMEWORK_DEFAULT_ERROR);
}
public SummerFrameworkException(SummerErrorMsg summerErrorMsg) {
super(summerErrorMsg);
}
public SummerFrameworkException(String message) {
super(message, SummerErrorMsgConstants.FRAMEWORK_DEFAULT_ERROR);
}
public SummerFrameworkException(String message, SummerErrorMsg summerErrorMsg) {
super(message, summerErrorMsg);
}
public SummerFrameworkException(String message, Throwable cause) {
super(message, cause, SummerErrorMsgConstants.FRAMEWORK_DEFAULT_ERROR);
}
public SummerFrameworkException(String message, Throwable cause, SummerErrorMsg summerErrorMsg) {
super(message, cause, summerErrorMsg);
}
public SummerFrameworkException(Throwable cause) {
super(cause, SummerErrorMsgConstants.FRAMEWORK_DEFAULT_ERROR);
}
public SummerFrameworkException(Throwable cause, SummerErrorMsg summerErrorMsg) {
super(cause, summerErrorMsg);
}
}
package org.hongxi.summer.exception;
/**
* Created by shenhongxi on 2020/7/25.
*/
public class SummerServiceException extends SummerAbstractException {
private static final long serialVersionUID = 167949946546769763L;
public SummerServiceException() {
super(SummerErrorMsgConstants.SERVICE_DEFAULT_ERROR);
}
public SummerServiceException(SummerErrorMsg summerErrorMsg) {
super(summerErrorMsg);
}
public SummerServiceException(String message) {
super(message, SummerErrorMsgConstants.SERVICE_DEFAULT_ERROR);
}
public SummerServiceException(String message, SummerErrorMsg summerErrorMsg) {
super(message, summerErrorMsg);
}
public SummerServiceException(String message, Throwable cause) {
super(message, cause, SummerErrorMsgConstants.SERVICE_DEFAULT_ERROR);
}
public SummerServiceException(String message, Throwable cause, SummerErrorMsg summerErrorMsg) {
super(message, cause, summerErrorMsg);
}
public SummerServiceException(Throwable cause) {
super(cause, SummerErrorMsgConstants.SERVICE_DEFAULT_ERROR);
}
public SummerServiceException(Throwable cause, SummerErrorMsg summerErrorMsg) {
super(cause, summerErrorMsg);
}
}
/**
* Created by shenhongxi on 2020/7/25.
*/
package org.hongxi.summer.protocol;
\ No newline at end of file
package org.hongxi.summer.protocol.summer;
import org.hongxi.summer.codec.AbstractCodec;
import org.hongxi.summer.codec.Serialization;
import org.hongxi.summer.common.SummerConstants;
import org.hongxi.summer.common.URLParamType;
import org.hongxi.summer.common.extension.ExtensionLoader;
import org.hongxi.summer.common.extension.SpiMeta;
import org.hongxi.summer.common.util.ByteUtils;
import org.hongxi.summer.common.util.ExceptionUtils;
import org.hongxi.summer.common.util.ReflectUtils;
import org.hongxi.summer.exception.SummerErrorMsgConstants;
import org.hongxi.summer.exception.SummerFrameworkException;
import org.hongxi.summer.rpc.DefaultRequest;
import org.hongxi.summer.rpc.DefaultResponse;
import org.hongxi.summer.rpc.Request;
import org.hongxi.summer.rpc.Response;
import org.hongxi.summer.transport.Channel;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
/**
* Created by shenhongxi on 2020/7/25.
*/
@SpiMeta(name = "org/hongxi/summer")
public class SummerCodec extends AbstractCodec {
public static final short MAGIC = (short) 0xF0F0;
public static final byte MASK = 0x07;
public static final int HEADER_LENGTH = 16;
public static final byte VERSION = 1;
@Override
public byte[] encode(Channel channel, Object message) throws IOException {
try {
if (message instanceof Request) {
return encodeRequest(channel, (Request) message);
} else if (message instanceof Response) {
return encodeResponse(channel, (Response) message);
}
} catch (Exception e) {
if (ExceptionUtils.isSummerException(e)) {
throw (RuntimeException) e;
} else {
throw new SummerFrameworkException("encode error: isResponse=" + (message instanceof Response), e,
SummerErrorMsgConstants.FRAMEWORK_ENCODE_ERROR);
}
}
throw new SummerFrameworkException("encode error: message type not support, " + message.getClass(),
SummerErrorMsgConstants.FRAMEWORK_ENCODE_ERROR);
}
/**
* decode data
*
* <pre>
* 对于client端:主要是来自server端的response or exception
* 对于server端: 主要是来自client端的request
* </pre>
*
* @param data
* @return
* @throws IOException
*/
@Override
public Object decode(Channel channel, String remoteIp, byte[] data) throws IOException {
if (data.length <= HEADER_LENGTH) {
throw new SummerFrameworkException("decode error: format problem",
SummerErrorMsgConstants.FRAMEWORK_DECODE_ERROR);
}
short type = ByteUtils.bytes2short(data, 0);
if (type != MAGIC) {
throw new SummerFrameworkException("decode error: magic error",
SummerErrorMsgConstants.FRAMEWORK_DECODE_ERROR);
}
if (data[2] != VERSION) {
throw new SummerFrameworkException("decode error: version error",
SummerErrorMsgConstants.FRAMEWORK_DECODE_ERROR);
}
int bodyLength = ByteUtils.bytes2int(data, 12);
if (HEADER_LENGTH + bodyLength != data.length) {
throw new SummerFrameworkException("decode error: content length error",
SummerErrorMsgConstants.FRAMEWORK_DECODE_ERROR);
}
byte flag = data[3];
byte dataType = (byte) (flag & MASK);
boolean isResponse = (dataType != SummerConstants.FLAG_REQUEST);
byte[] body = new byte[bodyLength];
System.arraycopy(data, HEADER_LENGTH, body, 0, bodyLength);
long requestId = ByteUtils.bytes2long(data, 4);
Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(channel.getUrl().getParameter(URLParamType.serialization.getName(),
URLParamType.serialization.value()));
try {
if (isResponse) { // response
return decodeResponse(body, dataType, requestId, serialization);
} else {
return decodeRequest(body, requestId, serialization);
}
} catch (ClassNotFoundException e) {
throw new SummerFrameworkException("decode " + (isResponse ? "response" : "request") +
" error: class not found", e,
SummerErrorMsgConstants.FRAMEWORK_DECODE_ERROR);
} catch (Exception e) {
if (ExceptionUtils.isSummerException(e)) {
throw (RuntimeException) e;
} else {
throw new SummerFrameworkException("decode error: isResponse=" + isResponse,
e, SummerErrorMsgConstants.FRAMEWORK_DECODE_ERROR);
}
}
}
/**
* request body 数据:
*
* <pre>
*
* body:
*
* byte[] data :
*
* serialize(interface_name, method_name, method_param_desc, method_param_value, attachments_size, attachments_value)
*
* method_param_desc: for_each (string.append(method_param_interface_name))
*
* method_param_value: for_each (method_param_name, method_param_value)
*
* attachments_value: for_each (attachment_name, attachment_value)
*
* </pre>
*
* @param request
* @return
* @throws IOException
*/
private byte[] encodeRequest(Channel channel, Request request) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutput output = createOutput(outputStream);
output.writeUTF(request.getInterfaceName());
output.writeUTF(request.getMethodName());
output.writeUTF(request.getParametersDesc());
Serialization serialization =
ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
channel.getUrl().getParameter(URLParamType.serialization.getName(),
URLParamType.serialization.value()));
if (request.getArguments() != null && request.getArguments().length > 0) {
for (Object obj : request.getArguments()) {
serialize(output, obj, serialization);
}
}
if (request.getAttachments() == null || request.getAttachments().isEmpty()) {
// empty attachments
output.writeInt(0);
} else {
output.writeInt(request.getAttachments().size());
for (Map.Entry<String, String> entry : request.getAttachments().entrySet()) {
output.writeUTF(entry.getKey());
output.writeUTF(entry.getValue());
}
}
output.flush();
byte[] body = outputStream.toByteArray();
byte flag = SummerConstants.FLAG_REQUEST;
output.close();
return encode(body, flag, request.getRequestId());
}
/**
* response body 数据:
*
* <pre>
*
* body:
*
* byte[] : serialize (result) or serialize (exception)
*
* </pre>
*
* @param channel
* @param value
* @return
* @throws IOException
*/
private byte[] encodeResponse(Channel channel, Response value) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutput output = createOutput(outputStream);
Serialization serialization =
ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
channel.getUrl().getParameter(URLParamType.serialization.getName(),
URLParamType.serialization.value()));
byte flag = 0;
output.writeLong(value.getProcessTime());
if (value.getException() != null) {
output.writeUTF(value.getException().getClass().getName());
serialize(output, value.getException(), serialization);
flag = SummerConstants.FLAG_RESPONSE_EXCEPTION;
} else if (value.getValue() == null) {
flag = SummerConstants.FLAG_RESPONSE_VOID;
} else {
output.writeUTF(value.getValue().getClass().getName());
serialize(output, value.getValue(), serialization);
flag = SummerConstants.FLAG_RESPONSE;
}
output.flush();
byte[] body = outputStream.toByteArray();
output.close();
return encode(body, flag, value.getRequestId());
}
/**
* 数据协议:
*
* <pre>
*
* header: 16个字节
*
* 0-15 bit : magic
* 16-23 bit : version
* 24-31 bit : extend flag , 其中: 29-30 bit: event 可支持4种event,比如normal, exception等, 31 bit : 0 is request , 1 is response
* 32-95 bit : request id
* 96-127 bit : body content length
*
* </pre>
*
* @param body
* @param flag
* @param requestId
* @return
* @throws IOException
*/
private byte[] encode(byte[] body, byte flag, long requestId) throws IOException {
byte[] header = new byte[HEADER_LENGTH];
int offset = 0;
// 0 - 15 bit : magic
ByteUtils.short2bytes(MAGIC, header, offset);
offset += 2;
// 16 - 23 bit : version
header[offset++] = VERSION;
// 24 - 31 bit : extend flag
header[offset++] = flag;
// 32 - 95 bit : requestId
ByteUtils.long2bytes(requestId, header, offset);
offset += 8;
// 96 - 127 bit : body content length
ByteUtils.int2bytes(body.length, header, offset);
byte[] data = new byte[header.length + body.length];
System.arraycopy(header, 0, data, 0, header.length);
System.arraycopy(body, 0, data, header.length, body.length);
return data;
}
private Object decodeRequest(byte[] body, long requestId, Serialization serialization) throws IOException, ClassNotFoundException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(body);
ObjectInput input = createInput(inputStream);
String interfaceName = input.readUTF();
String methodName = input.readUTF();
String paramtersDesc = input.readUTF();
DefaultRequest rpcRequest = new DefaultRequest();
rpcRequest.setRequestId(requestId);
rpcRequest.setInterfaceName(interfaceName);
rpcRequest.setMethodName(methodName);
rpcRequest.setParametersDesc(paramtersDesc);
rpcRequest.setArguments(decodeRequestParameter(input, paramtersDesc, serialization));
rpcRequest.setAttachments(decodeRequestAttachments(input));
input.close();
return rpcRequest;
}
private Object[] decodeRequestParameter(ObjectInput input, String parameterDesc, Serialization serialization) throws IOException,
ClassNotFoundException {
if (parameterDesc == null || parameterDesc.equals("")) {
return null;
}
Class<?>[] classTypes = ReflectUtils.forNames(parameterDesc);
Object[] paramObjs = new Object[classTypes.length];
for (int i = 0; i < classTypes.length; i++) {
paramObjs[i] = deserialize((byte[]) input.readObject(), classTypes[i], serialization);
}
return paramObjs;
}
private Map<String, String> decodeRequestAttachments(ObjectInput input) throws IOException, ClassNotFoundException {
int size = input.readInt();
if (size <= 0) {
return null;
}
Map<String, String> attachments = new HashMap<String, String>();
for (int i = 0; i < size; i++) {
attachments.put(input.readUTF(), input.readUTF());
}
return attachments;
}
private Object decodeResponse(byte[] body, byte dataType, long requestId, Serialization serialization) throws IOException,
ClassNotFoundException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(body);
ObjectInput input = createInput(inputStream);
long processTime = input.readLong();
DefaultResponse response = new DefaultResponse();
response.setRequestId(requestId);
response.setProcessTime(processTime);
if (dataType == SummerConstants.FLAG_RESPONSE_VOID) {
return response;
}
String className = input.readUTF();
Class<?> clz = ReflectUtils.forName(className);
Object result = deserialize((byte[]) input.readObject(), clz, serialization);
if (dataType == SummerConstants.FLAG_RESPONSE) {
response.setValue(result);
} else if (dataType == SummerConstants.FLAG_RESPONSE_EXCEPTION) {
response.setException((Exception) result);
} else {
throw new SummerFrameworkException("decode error: response dataType not support " + dataType,
SummerErrorMsgConstants.FRAMEWORK_DECODE_ERROR);
}
response.setRequestId(requestId);
input.close();
return response;
}
}
package org.hongxi.summer.rpc;
import java.util.concurrent.Executor;
/**
* Created by shenhongxi on 2020/8/22.
*/
public interface Callbackable {
void addFinishCallback(Runnable runnable, Executor executor);
void onFinish();
}
package org.hongxi.summer.rpc;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Created by shenhongxi on 2020/7/28.
*/
public class DefaultRequest implements Request, Serializable {
private static final long serialVersionUID = -6525078483477733530L;
private String interfaceName;
private String methodName;
private String parametersDesc;
private Object[] arguments;
private Map<String, String> attachments;
private int retries = 0;
private long requestId;
private int serializationNumber = 0;// default serialization is hession2
@Override
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
@Override
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
@Override
public String getParametersDesc() {
return parametersDesc;
}
public void setParametersDesc(String parametersDesc) {
this.parametersDesc = parametersDesc;
}
@Override
public Object[] getArguments() {
return arguments;
}
public void setArguments(Object[] arguments) {
this.arguments = arguments;
}
@Override
public Map<String, String> getAttachments() {
return attachments != null ? attachments : Collections.<String, String>emptyMap();
}
public void setAttachments(Map<String, String> attachments) {
this.attachments = attachments;
}
@Override
public void setAttachment(String key, String value) {
if (this.attachments == null) {
this.attachments = new HashMap<>();
}
this.attachments.put(key, value);
}
@Override
public long getRequestId() {
return requestId;
}
public void setRequestId(long requestId) {
this.requestId = requestId;
}
@Override
public int getRetries() {
return retries;
}
@Override
public void setRetries(int retries) {
this.retries = retries;
}
@Override
public void setSerializationNumber(int number) {
this.serializationNumber = number;
}
@Override
public int getSerializationNumber() {
return serializationNumber;
}
@Override
public String toString() {
return interfaceName + "." + methodName + "(" + parametersDesc + ") requestId=" + requestId;
}
}
package org.hongxi.summer.rpc;
import org.apache.commons.lang3.tuple.Pair;
import org.hongxi.summer.exception.SummerServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by shenhongxi on 2020/7/25.
*/
public class DefaultResponse implements Response, Callbackable, Serializable {
private static final long serialVersionUID = -46598719225168485L;
private static final Logger logger = LoggerFactory.getLogger(DefaultResponse.class);
private Object value;
private Exception exception;
private long requestId;
private long processTime;
private int timeout;
private Map<String, String> attachments;// rpc协议版本兼容时可以回传一些额外的信息
private int serializationNumber = 0;// default serialization is hession2
private List<Pair<Runnable, Executor>> taskList = new ArrayList<>();
private AtomicBoolean isFinished = new AtomicBoolean();
public DefaultResponse() {
}
public DefaultResponse(long requestId) {
this.requestId = requestId;
}
public DefaultResponse(Response response) {
this.value = response.getValue();
this.exception = response.getException();
this.requestId = response.getRequestId();
this.processTime = response.getProcessTime();
this.timeout = response.getTimeout();
this.serializationNumber = response.getSerializationNumber();
this.attachments = response.getAttachments();
}
public DefaultResponse(Object value) {
this.value = value;
}
public DefaultResponse(Object value, long requestId) {
this.value = value;
}
@Override
public Object getValue() {
if (exception != null) {
throw (exception instanceof RuntimeException) ? (RuntimeException) exception : new SummerServiceException(
exception.getMessage(), exception);
}
return value;
}
public void setValue(Object value) {
this.value = value;
}
@Override
public Exception getException() {
return exception;
}
public void setException(Exception exception) {
this.exception = exception;
}
@Override
public long getRequestId() {
return requestId;
}
public void setRequestId(long requestId) {
this.requestId = requestId;
}
@Override
public long getProcessTime() {
return processTime;
}
@Override
public void setProcessTime(long time) {
this.processTime = time;
}
@Override
public int getTimeout() {
return timeout;
}
@Override
public Map<String, String> getAttachments() {
return attachments != null ? attachments : Collections.emptyMap();
}
@Override
public void setAttachment(String key, String value) {
if (attachments == null) {
attachments = new HashMap<>();
}
attachments.put(key, value);
}
public void setAttachments(Map<String, String> attachments) {
this.attachments = attachments;
}
@Override
public void setSerializationNumber(int number) {
this.serializationNumber = number;
}
@Override
public int getSerializationNumber() {
return serializationNumber;
}
@Override
public void addFinishCallback(Runnable runnable, Executor executor) {
if (!isFinished.get()) {
taskList.add(Pair.of(runnable, executor));
}
}
@Override
public void onFinish() {
if (!isFinished.compareAndSet(false, true)) {
return;
}
for (Pair<Runnable, Executor> pair : taskList) {
Runnable runnable = pair.getKey();
Executor executor = pair.getValue();
if (executor == null) {
runnable.run();
} else {
try {
executor.execute(runnable);
} catch (Exception e) {
logger.error("Callbackable response exec callback task error", e);
}
}
}
}
}
package org.hongxi.summer.rpc;
import org.hongxi.summer.common.FutureState;
import org.hongxi.summer.common.util.SummerFrameworkUtils;
import org.hongxi.summer.exception.SummerErrorMsgConstants;
import org.hongxi.summer.exception.SummerFrameworkException;
import org.hongxi.summer.exception.SummerServiceException;
import org.hongxi.summer.serialize.DeserializableObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
/**
* Created by shenhongxi on 2020/8/23.
*/
public class DefaultResponseFuture implements ResponseFuture {
private static final Logger logger = LoggerFactory.getLogger(DefaultResponseFuture.class);
protected final Object lock = new Object();
protected volatile FutureState state = FutureState.DOING;
protected Object result = null;
protected Exception exception = null;
protected long createTime = System.currentTimeMillis();
protected int timeout = 0;
protected long processTime = 0;
protected Request request;
protected List<FutureListener> listeners;
protected URL serverUrl;
protected Class returnType;
private Map<String, String> attachments;// rpc协议版本兼容时可以回传一些额外的信息
public DefaultResponseFuture(Request requestObj, int timeout, URL serverUrl) {
this.request = requestObj;
this.timeout = timeout;
this.serverUrl = serverUrl;
}
@Override
public void onSuccess(Response response) {
this.result = response.getValue();
this.processTime = response.getProcessTime();
this.attachments = response.getAttachments();
done();
}
@Override
public void onFailure(Response response) {
this.exception = response.getException();
this.processTime = response.getProcessTime();
done();
}
@Override
public Object getValue() {
synchronized (lock) {
if (!isDoing()) {
return getValueOrThrowable();
}
if (timeout <= 0) {
try {
lock.wait();
} catch (Exception e) {
cancel(new SummerServiceException(this.getClass().getName() +
" getValue InterruptedException : "
+ SummerFrameworkUtils.toString(request) +
" cost=" + (System.currentTimeMillis() - createTime), e));
}
return getValueOrThrowable();
} else {
long waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime > 0) {
for (; ; ) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
}
if (!isDoing()) {
break;
} else {
waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime <= 0) {
break;
}
}
}
}
if (isDoing()) {
timeoutSoCancel();
}
}
return getValueOrThrowable();
}
}
@Override
public Exception getException() {
return exception;
}
@Override
public boolean cancel() {
Exception e = new SummerServiceException(this.getClass().getName() +
" task cancel: serverPort=" + serverUrl.getServerPortStr() + " "
+ SummerFrameworkUtils.toString(request) +
" cost=" + (System.currentTimeMillis() - createTime));
return cancel(e);
}
protected boolean cancel(Exception e) {
synchronized (lock) {
if (!isDoing()) {
return false;
}
state = FutureState.CANCELLED;
exception = e;
lock.notifyAll();
}
notifyListeners();
return true;
}
@Override
public boolean isCancelled() {
return state.isCancelledState();
}
@Override
public boolean isDone() {
return state.isDoneState();
}
@Override
public boolean isSuccess() {
return isDone() && (exception == null);
}
@Override
public void addListener(FutureListener listener) {
if (listener == null) {
throw new NullPointerException("FutureListener is null");
}
boolean notifyNow = false;
synchronized (lock) {
if (!isDoing()) {
notifyNow = true;
} else {
if (listeners == null) {
listeners = new ArrayList<>(1);
}
listeners.add(listener);
}
}
if (notifyNow) {
notifyListener(listener);
}
}
@Override
public long getCreateTime() {
return createTime;
}
@Override
public void setReturnType(Class<?> clazz) {
this.returnType = clazz;
}
public Object getRequestObj() {
return request;
}
public FutureState getState() {
return state;
}
private void timeoutSoCancel() {
this.processTime = System.currentTimeMillis() - createTime;
synchronized (lock) {
if (!isDoing()) {
return;
}
state = FutureState.CANCELLED;
exception = new SummerServiceException(
this.getClass().getName() +
" request timeout: serverPort=" + serverUrl.getServerPortStr()
+ " " + SummerFrameworkUtils.toString(request) +
" cost=" + (System.currentTimeMillis() - createTime),
SummerErrorMsgConstants.SERVICE_TIMEOUT);
lock.notifyAll();
}
notifyListeners();
}
private void notifyListeners() {
if (listeners != null) {
for (FutureListener listener : listeners) {
notifyListener(listener);
}
}
}
private void notifyListener(FutureListener listener) {
try {
listener.operationComplete(this);
} catch (Throwable t) {
logger.error(this.getClass().getName() + " notifyListener Error: " + listener.getClass().getSimpleName(), t);
}
}
private boolean isDoing() {
return state.isDoingState();
}
protected boolean done() {
synchronized (lock) {
if (!isDoing()) {
return false;
}
state = FutureState.DONE;
lock.notifyAll();
}
notifyListeners();
return true;
}
@Override
public long getRequestId() {
return this.request.getRequestId();
}
private Object getValueOrThrowable() {
if (exception != null) {
throw (exception instanceof RuntimeException) ?
(RuntimeException) exception :
new SummerServiceException(exception.getMessage(), exception);
}
if (result != null && returnType != null && result instanceof DeserializableObject) {
try {
result = ((DeserializableObject) result).deserialize(returnType);
} catch (IOException e) {
logger.error("deserialize response value fail! return type: {}", returnType, e);
throw new SummerFrameworkException("deserialize return value fail! deserialize type:" + returnType, e);
}
}
return result;
}
@Override
public long getProcessTime() {
return processTime;
}
@Override
public void setProcessTime(long time) {
this.processTime = time;
}
@Override
public int getTimeout() {
return timeout;
}
@Override
public Map<String, String> getAttachments() {
return attachments != null ? attachments : Collections.<String, String>emptyMap();
}
@Override
public void setAttachment(String key, String value) {
if (this.attachments == null) {
this.attachments = new HashMap<>();
}
this.attachments.put(key, value);
}
@Override
public void setSerializationNumber(int number) {
}
@Override
public int getSerializationNumber() {
return 0;
}
}
package org.hongxi.summer.rpc;
/**
* Created by shenhongxi on 2020/7/30.
*/
public interface Future {
/**
* cancel the task
*
* @return
*/
boolean cancel();
/**
* task cancelled
*
* @return
*/
boolean isCancelled();
/**
* task is complete : normal or exception
*
* @return
*/
boolean isDone();
/**
* isDone() & normal
*
* @return
*/
boolean isSuccess();
/**
* if task is success, return the result.
*
* @throws Exception when timeout, cancel, onFailure
* @return
*/
Object getValue();
/**
* if task is done or cancel, return the exception
*
* @return
*/
Exception getException();
/**
* add future listener , when task is success,failure, timeout, cancel, it will be called
*
* @param listener
*/
void addListener(FutureListener listener);
}
package org.hongxi.summer.rpc;
/**
* 用于监听Future的success和fail事件
*
* Created by shenhongxi on 2020/7/30.
*/
public interface FutureListener {
/**
* <pre>
* 建议做一些比较简单的低功耗的操作
*
* 注意一些反模式:
*
* 1) 死循环:
* operationComplete(Future future) {
* ......
* future.addListener(this); // 类似于这种操作,后果你懂的
* ......
* }
*
* 2)耗资源操作或者慢操作:
* operationComplete(Future future) {
* ......
* Thread.sleep(500);
* ......
* }
*
* </pre>
*
* @param future
* @throws Exception
*/
void operationComplete(Future future) throws Exception;
}
package org.hongxi.summer.rpc;
import java.util.Map;
/**
* Created by shenhongxi on 2020/6/14.
*/
public interface Request {
/**
* service interface
*
* @return
*/
String getInterfaceName();
/**
* service method name
*
* @return
*/
String getMethodName();
/**
* service method param desc (sign)
*
* @return
*/
String getParametersDesc();
/**
* service method param
*
* @return
*/
Object[] getArguments();
/**
* get framework param
*
* @return
*/
Map<String, String> getAttachments();
/**
* set framework param
*
* @return
*/
void setAttachment(String name, String value);
/**
* request id
*
* @return
*/
long getRequestId();
/**
* retries
*
* @return
*/
int getRetries();
/**
* set retries
*/
void setRetries(int retries);
/**
* set the serialization number.
* same to the protocol version, this value only used in server end for compatible.
*
* @param number
*/
void setSerializationNumber(int number);
int getSerializationNumber();
}
package org.hongxi.summer.rpc;
import java.util.Map;
/**
* Created by shenhongxi on 2020/6/14.
*/
public interface Response {
/**
* <pre>
* 如果 request 正常处理,那么会返回 Object value,而如果 request 处理有异常,那么 getValue 会抛出异常
* </pre>
*
* @return
* @throws RuntimeException
*/
Object getValue();
/**
* 如果request处理有异常,那么调用该方法return exception 如果request还没处理完或者request处理正常,那么return null
* <p>
* <pre>
* 该方法不会阻塞,无论该request是处理中还是处理完成
* </pre>
*
* @return
*/
Exception getException();
/**
* 与 Request 的 requestId 相对应
*
* @return
*/
long getRequestId();
/**
* 业务处理时间
*
* @return
*/
long getProcessTime();
/**
* 业务处理时间
*
* @param time
*/
void setProcessTime(long time);
int getTimeout();
Map<String, String> getAttachments();
void setAttachment(String key, String value);
/**
* set the serialization number.
* same to the protocol version, this value only used in server end for compatible.
*
* @param number
*/
void setSerializationNumber(int number);
int getSerializationNumber();
}
package org.hongxi.summer.rpc;
/**
* Created by shenhongxi on 2020/7/30.
*/
public interface ResponseFuture extends Future, Response {
void onSuccess(Response response);
void onFailure(Response response);
long getCreateTime();
void setReturnType(Class<?> clazz);
}
package org.hongxi.summer.rpc;
import org.hongxi.summer.common.URLParamType;
import java.util.HashMap;
import java.util.Map;
/**
* Created by shenhongxi on 2020/6/26.
*/
public class RpcContext {
private static final ThreadLocal<RpcContext> LOCAL_CONTEXT = ThreadLocal.withInitial(() -> new RpcContext());
private Map<Object, Object> attributes = new HashMap<>();
private Map<String, String> attachments = new HashMap<>();
private Request request;
private Response response;
private String clientRequestId;
public static RpcContext getContext() {
return LOCAL_CONTEXT.get();
}
public static void destroy() {
LOCAL_CONTEXT.remove();
}
/**
* init new rpcContext with request
*
* @param request
* @return
*/
public static RpcContext init(Request request) {
RpcContext context = new RpcContext();
if (request != null) {
context.setRequest(request);
context.setClientRequestId(
request.getAttachments().get(URLParamType.requestIdFromClient.getName()));
}
LOCAL_CONTEXT.set(context);
return context;
}
public String getRequestId() {
if (clientRequestId != null) return clientRequestId;
if (request != null) return String.valueOf(request.getRequestId());
return null;
}
public void putAttribute(Object key, Object value) {
attributes.put(key, value);
}
public Object getAttribute(Object key) {
return attributes.get(key);
}
public void removeAttribute(Object key) {
attributes.remove(key);
}
public Map<Object, Object> getAttributes() {
return attributes;
}
public Request getRequest() {
return request;
}
public void setRequest(Request request) {
this.request = request;
}
public Response getResponse() {
return response;
}
public void setResponse(Response response) {
this.response = response;
}
public String getClientRequestId() {
return clientRequestId;
}
public void setClientRequestId(String clientRequestId) {
this.clientRequestId = clientRequestId;
}
}
package org.hongxi.summer.rpc;
import org.hongxi.summer.common.SummerConstants;
import org.hongxi.summer.common.URLParamType;
import org.hongxi.summer.common.util.SummerFrameworkUtils;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by shenhongxi on 2020/6/14.
*/
public class URL {
private String protocol;
private String host;
private int port;
private String path;
private Map<String, String> parameters;
private volatile transient Map<String, Number> numbers;
public URL(String protocol, String host, int port, String path) {
this(protocol, host, port, path, new HashMap<>());
}
public URL(String protocol, String host, int port, String path, Map<String, String> parameters) {
this.protocol = protocol;
this.host = host;
this.port = port;
this.path = path;
this.parameters = parameters;
}
public Map<String, String> getParameters() {
return parameters;
}
public String getParameter(String name) {
return parameters.get(name);
}
public String getParameter(String name, String defaultValue) {
return parameters.getOrDefault(name, defaultValue);
}
public boolean getBooleanParameter(String name, boolean defaultValue) {
String value = getParameter(name);
if (value == null || value.isEmpty()) {
return defaultValue;
}
return Boolean.parseBoolean(value);
}
public int getIntParameter(String name, int defaultValue) {
String value = getParameter(name);
if (value == null || value.isEmpty()) {
return defaultValue;
}
return Integer.parseInt(value);
}
public String getServerPortStr() {
return buildHostPortStr(host, port);
}
private String buildHostPortStr(String host, int defaultPort) {
if (defaultPort <= 0) {
return host;
}
int idx = host.indexOf(":");
if (idx < 0) {
return host + ":" + defaultPort;
}
int port = Integer.parseInt(host.substring(idx + 1));
if (port <= 0) {
return host.substring(0, idx + 1) + ":" + defaultPort;
}
return host;
}
public URL createCopy() {
Map<String, String> params = new HashMap<String, String>();
if (this.parameters != null) {
params.putAll(this.parameters);
}
return new URL(protocol, host, port, path, params);
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = removeAsyncPath(path);
}
public String getVersion() {
return getParameter(URLParamType.version.getName(), URLParamType.version.value());
}
public String getGroup() {
return getParameter(URLParamType.group.getName(), URLParamType.group.value());
}
public Object getUri() {
return protocol + SummerConstants.PROTOCOL_SEPARATOR + host + ":" + port
+ File.separator + path;
}
/**
* 返回一个service or referer的identity,如果两个url的identity相同,则表示相同的一个service或者referer
*
* @return
*/
public String getIdentity() {
return protocol + SummerConstants.PROTOCOL_SEPARATOR + host + ":" + port +
"/" + getParameter(URLParamType.group.getName(), URLParamType.group.value()) + "/" +
getPath() + "/" + getParameter(URLParamType.version.getName(), URLParamType.version.value()) +
"/" + getParameter(URLParamType.nodeType.getName(), URLParamType.nodeType.value());
}
public Integer getMethodParameter(String methodName, String paramDesc, String name, int defaultValue) {
String key = methodName + "(" + paramDesc + ")." + name;
Number n = getNumbers().get(key);
if (n != null) {
return n.intValue();
}
String value = getMethodParameter(methodName, paramDesc, name);
if (value == null || value.length() == 0) {
return defaultValue;
}
int i = Integer.parseInt(value);
getNumbers().put(key, i);
return i;
}
public String getMethodParameter(String methodName, String paramDesc, String name) {
String value = getParameter(SummerConstants.METHOD_CONFIG_PREFIX + methodName + "(" + paramDesc + ")." + name);
if (value == null || value.length() == 0) {
return getParameter(name);
}
return value;
}
private Map<String, Number> getNumbers() {
if (numbers == null) { // 允许并发重复创建
numbers = new ConcurrentHashMap<String, Number>();
}
return numbers;
}
/**
* because async call in client path with Async suffix,we need
* remove Async suffix in path for subscribe.
* @param path
* @return
*/
private String removeAsyncPath(String path){
return SummerFrameworkUtils.removeAsyncSuffix(path);
}
@Override
public int hashCode() {
int factor = 31;
int rs = 1;
rs = factor * rs + Objects.hashCode(protocol);
rs = factor * rs + Objects.hashCode(host);
rs = factor * rs + Objects.hashCode(port);
rs = factor * rs + Objects.hashCode(path);
rs = factor * rs + Objects.hashCode(parameters);
return rs;
}
@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof URL)) {
return false;
}
URL ou = (URL) obj;
if (!Objects.equals(this.protocol, ou.protocol)) {
return false;
}
if (!Objects.equals(this.host, ou.host)) {
return false;
}
if (!Objects.equals(this.port, ou.port)) {
return false;
}
if (!Objects.equals(this.path, ou.path)) {
return false;
}
return Objects.equals(this.parameters, ou.parameters);
}
@Override
public String toString() {
return toSimpleString();
}
// 包含协议、host、port、path、group
public String toSimpleString() {
return getUri() + "?group=" + getGroup();
}
}
package org.hongxi.summer.serialize;
import org.hongxi.summer.codec.Serialization;
import java.io.IOException;
/**
* Created by shenhongxi on 2020/7/28.
*/
public class DeserializableObject {
private Serialization serialization;
private byte[] objBytes;
public DeserializableObject(Serialization serialization, byte[] objBytes) {
this.serialization = serialization;
this.objBytes = objBytes;
}
public <T> T deserialize(Class<T> clz) throws IOException {
return serialization.deserialize(objBytes, clz);
}
public Object[] deserializeMulti(Class<?>[] paramTypes) throws IOException {
Object[] ret = null;
if (paramTypes != null && paramTypes.length > 0) {
ret = serialization.deserializeMulti(objBytes, paramTypes);
}
return ret;
}
}
package org.hongxi.summer.serialize;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.JSONSerializer;
import com.alibaba.fastjson.serializer.SerializeWriter;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.hongxi.summer.codec.Serialization;
import org.hongxi.summer.common.extension.SpiMeta;
import java.io.IOException;
import java.util.List;
/**
* fastjson 序列化
*
* <pre>
* 对于嵌套场景无法支持
* </pre>
*
* Created by shenhongxi on 2020/7/28.
*
*/
@SpiMeta(name = "fastjson")
public class FastJsonSerialization implements Serialization {
@Override
public byte[] serialize(Object data) throws IOException {
SerializeWriter out = new SerializeWriter();
JSONSerializer serializer = new JSONSerializer(out);
serializer.config(SerializerFeature.WriteEnumUsingToString, true);
serializer.config(SerializerFeature.WriteClassName, true);
serializer.write(data);
return out.toBytes("UTF-8");
}
@Override
public <T> T deserialize(byte[] data, Class<T> clz) throws IOException {
return JSON.parseObject(new String(data), clz);
}
@Override
public byte[] serializeMulti(Object[] data) throws IOException {
return serialize(data);
}
@Override
public Object[] deserializeMulti(byte[] data, Class<?>[] classes) throws IOException {
List<Object> list = JSON.parseArray(new String(data), classes);
if (list != null) {
return list.toArray();
}
return null;
}
@Override
public int getSerializationNumber() {
return 2;
}
}
package org.hongxi.summer.serialize;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import org.hongxi.summer.codec.Serialization;
import org.hongxi.summer.common.extension.SpiMeta;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* hession2 序列化,要求序列化的对象实现 java.io.Serializable 接口
*
* Created by shenhongxi on 2020/7/28.
*
*/
@SpiMeta(name = "hessian2")
public class Hessian2Serialization implements Serialization {
@Override
public byte[] serialize(Object data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Hessian2Output out = new Hessian2Output(bos);
out.writeObject(data);
out.flush();
return bos.toByteArray();
}
@SuppressWarnings("unchecked")
@Override
public <T> T deserialize(byte[] data, Class<T> clz) throws IOException {
Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(data));
return (T) input.readObject(clz);
}
@Override
public byte[] serializeMulti(Object[] data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Hessian2Output out = new Hessian2Output(bos);
for(Object obj: data){
out.writeObject(obj);
}
out.flush();
return bos.toByteArray();
}
@Override
public Object[] deserializeMulti(byte[] data, Class<?>[] classes) throws IOException {
Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(data));
Object[] objects = new Object[classes.length];
for (int i = 0; i < classes.length; i++) {
objects[i] = input.readObject(classes[i]);
}
return objects;
}
@Override
public int getSerializationNumber() {
return 0;
}
}
package org.hongxi.summer.transport;
import org.hongxi.summer.codec.Codec;
import org.hongxi.summer.common.ChannelState;
import org.hongxi.summer.common.URLParamType;
import org.hongxi.summer.common.extension.ExtensionLoader;
import org.hongxi.summer.common.util.SummerFrameworkUtils;
import org.hongxi.summer.exception.SummerFrameworkException;
import org.hongxi.summer.rpc.Request;
import org.hongxi.summer.rpc.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
/**
* Created by shenhongxi on 2020/7/28.
*/
public abstract class AbstractClient implements Client {
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
protected InetSocketAddress localAddress;
protected InetSocketAddress remoteAddress;
protected URL url;
protected Codec codec;
protected volatile ChannelState state = ChannelState.UNINIT;
public AbstractClient(URL url) {
this.url = url;
this.codec =
ExtensionLoader.getExtensionLoader(Codec.class).getExtension(
url.getParameter(URLParamType.codec.getName(), URLParamType.codec.value()));
logger.info("init netty client. url: " + url.getHost() + "-" + url.getPath() + ", use codec: " + codec.getClass().getSimpleName());
}
@Override
public InetSocketAddress getLocalAddress() {
return localAddress;
}
@Override
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public void heartbeat(Request request) {
throw new SummerFrameworkException("heartbeat not support: " + SummerFrameworkUtils.toString(request));
}
public void setLocalAddress(InetSocketAddress localAddress) {
this.localAddress = localAddress;
}
public void setRemoteAddress(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
}
package org.hongxi.summer.transport;
import org.hongxi.summer.codec.Codec;
import org.hongxi.summer.common.ChannelState;
import org.hongxi.summer.common.URLParamType;
import org.hongxi.summer.common.extension.ExtensionLoader;
import org.hongxi.summer.exception.SummerFrameworkException;
import org.hongxi.summer.rpc.URL;
import java.net.InetSocketAddress;
import java.util.Collection;
/**
* Created by shenhongxi on 2020/6/25.
*/
public abstract class AbstractServer implements Server {
protected InetSocketAddress localAddress;
protected InetSocketAddress remoteAddress;
protected URL url;
protected Codec codec;
protected volatile ChannelState state = ChannelState.UNINIT;
public AbstractServer() {}
public AbstractServer(URL url) {
this.url = url;
this.codec = ExtensionLoader.getExtensionLoader(Codec.class).getExtension(
url.getParameter(URLParamType.codec.getName(), URLParamType.codec.value()));
}
@Override
public Collection<Channel> getChannels() {
throw new SummerFrameworkException(this.getClass().getName() + " getChannels() method not support " + url);
}
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
throw new SummerFrameworkException(this.getClass().getName() + " getChannels(InetSocketAddress) method not support " + url);
}
@Override
public InetSocketAddress getLocalAddress() {
return localAddress;
}
@Override
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
public void setLocalAddress(InetSocketAddress localAddress) {
this.localAddress = localAddress;
}
public void setRemoteAddress(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
public void setUrl(URL url) {
this.url = url;
}
public void setCodec(Codec codec) {
this.codec = codec;
}
}
package org.hongxi.summer.transport;
import org.hongxi.summer.common.URLParamType;
import org.hongxi.summer.common.threadpool.DefaultThreadFactory;
import org.hongxi.summer.common.threadpool.StandardThreadPoolExecutor;
import org.hongxi.summer.common.util.CollectionUtils;
import org.hongxi.summer.common.util.MathUtils;
import org.hongxi.summer.exception.SummerServiceException;
import org.hongxi.summer.rpc.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by shenhongxi on 2020/7/28.
*/
public abstract class AbstractSharedPoolClient extends AbstractClient {
private static final Logger logger = LoggerFactory.getLogger(AbstractSharedPoolClient.class);
private static final ThreadPoolExecutor EXECUTOR = new StandardThreadPoolExecutor(1, 300, 20000,
new DefaultThreadFactory("AbstractPoolClient-initPool-", true));
private final AtomicInteger idx = new AtomicInteger();
protected SharedObjectFactory<Channel> factory;
protected List<Channel> channels;
protected int connections;
public AbstractSharedPoolClient(URL url) {
super(url);
connections = url.getIntParameter(URLParamType.minClientConnections.getName(),
URLParamType.minClientConnections.intValue());
if (connections <= 0) {
connections = URLParamType.minClientConnections.intValue();
}
}
protected void initPool() {
factory = createChannelFactory();
channels = new ArrayList<>(connections);
for (int i = 0; i < connections; i++) {
channels.add(factory.makeObject());
}
initConnections(url.getBooleanParameter(URLParamType.asyncInitConnection.getName(),
URLParamType.asyncInitConnection.boolValue()));
}
protected abstract SharedObjectFactory createChannelFactory();
protected void initConnections(boolean async) {
if (async) {
EXECUTOR.execute(() -> createConnections());
} else {
createConnections();
}
}
private void createConnections() {
for (Channel channel : channels) {
try {
channel.open();
} catch (Exception e) {
logger.error("init pool create connect Error: url={}", url.getUri(), e);
}
}
}
protected Channel getChannel() {
int index = MathUtils.getNonNegativeRange24bit(idx.getAndIncrement());
Channel channel;
for (int i = index; i < connections + 1 + index; i++) {
channel = channels.get(i % connections);
if (!channel.isAvailable()) {
factory.rebuildObject(channel, i != connections + 1);
}
if (channel.isAvailable()) {
return channel;
}
}
String errorMsg = this.getClass().getSimpleName() + " getChannel Error: url=" + url.getUri();
logger.error(errorMsg);
throw new SummerServiceException(errorMsg);
}
protected void closeAllChannels() {
if (!CollectionUtils.isEmpty(channels)) {
for (Channel channel : channels) {
channel.close();
}
}
}
}
package org.hongxi.summer.transport;
import org.hongxi.summer.rpc.Request;
import org.hongxi.summer.rpc.Response;
import org.hongxi.summer.rpc.URL;
import java.net.InetSocketAddress;
/**
* Created by shenhongxi on 2020/6/14.
*/
public interface Channel {
InetSocketAddress getLocalAddress();
InetSocketAddress getRemoteAddress();
Response request(Request request) throws TransportException;
boolean open();
void close();
void close(int timeout);
boolean isClosed();
boolean isAvailable();
URL getUrl();
}
package org.hongxi.summer.transport;
import org.hongxi.summer.rpc.Request;
/**
* Created by shenhongxi on 2020/7/28.
*/
public interface Client extends Endpoint {
void heartbeat(Request request);
}
package org.hongxi.summer.transport;
/**
* Created by shenhongxi on 2020/6/14.
*/
public interface Endpoint extends Channel {
}
package org.hongxi.summer.transport;
import org.hongxi.summer.common.extension.Scope;
import org.hongxi.summer.common.extension.Spi;
import org.hongxi.summer.rpc.URL;
/**
* Created by shenhongxi on 2020/7/31.
*/
@Spi(scope = Scope.SINGLETON)
public interface EndpointFactory {
/**
* create remote server
*
* @param url
* @param messageHandler
* @return
*/
Server createServer(URL url, MessageHandler messageHandler);
/**
* create remote client
*
* @param url
* @return
*/
Client createClient(URL url);
/**
* safe release server
*
* @param server
* @param url
*/
void safeReleaseResource(Server server, URL url);
/**
* safe release client
*
* @param client
* @param url
*/
void safeReleaseResource(Client client, URL url);
}
package org.hongxi.summer.transport;
/**
* Created by shenhongxi on 2020/6/27.
*/
public interface MessageHandler {
Object handle(Channel channel, Object message);
}
package org.hongxi.summer.transport;
import java.net.InetSocketAddress;
import java.util.Collection;
/**
* Created by shenhongxi on 2020/6/25.
*/
public interface Server extends Endpoint {
boolean isBound();
Collection<Channel> getChannels();
Channel getChannel(InetSocketAddress remoteAddress);
}
package org.hongxi.summer.transport;
/**
* Created by shenhongxi on 2020/7/28.
*/
public interface SharedObjectFactory<T> {
/**
* 创建对象
*
* @return
*/
T makeObject();
/**
* 重建对象
*
* @param obj
* @param async
* @return
*/
boolean rebuildObject(T obj, boolean async);
}
package org.hongxi.summer.transport;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* Created by shenhongxi on 2020/6/14.
*/
public class TransportException extends IOException {
private static final long serialVersionUID = -6184671228777275302L;
private InetSocketAddress localAddress;
private InetSocketAddress remoteAddress;
public TransportException(String message, InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
super(message);
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
public TransportException(String message, Throwable cause, InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
super(message, cause);
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
public InetSocketAddress getLocalAddress() {
return localAddress;
}
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
}
package org.hongxi.summer.transport.support;
import org.hongxi.summer.common.URLParamType;
import org.hongxi.summer.common.util.SummerFrameworkUtils;
import org.hongxi.summer.exception.SummerErrorMsgConstants;
import org.hongxi.summer.exception.SummerFrameworkException;
import org.hongxi.summer.rpc.URL;
import org.hongxi.summer.transport.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* abstract endpoint factory
*
* <pre>
* 一些约定:
*
* 1) service :
* 1.1) not share channel : 某个service暴露服务的时候,不期望和别的service共享服务,明哲自保,比如你说:我很重要,我很重要。
*
* 1.2) share channel : 某个service 暴露服务的时候,如果有某个模块,但是拆成10个接口,可以使用这种方式,不过有一些约束条件:接口的几个serviceConfig配置需要保持一致。
*
* 不允许差异化的配置如下:
* protocol, codec , serialization, maxContentLength , maxServerConnection , maxWorkerThread, workerQueueSize, heartbeatFactory
*
* 2)心跳机制:
*
* 不同的protocol的心跳包格式可能不一样,无法进行强制,那么通过可扩展的方式,依赖heartbeatFactory进行heartbeat包的创建,
* 同时对于service的messageHandler进行wrap heartbeat包的处理。
*
* 对于service来说,把心跳包当成普通的request处理,因为这种heartbeat才能够探测到整个service处理的关键路径的可用状况
*
* </pre>
*
*
* Created by shenhongxi on 2020/7/31.
*/
public abstract class AbstractEndpointFactory implements EndpointFactory {
private static final Logger logger = LoggerFactory.getLogger(AbstractEndpointFactory.class);
/** 维持share channel 的service列表 **/
protected Map<String, Server> ipPort2ServerShareChannel = new HashMap<>();
protected ConcurrentMap<Server, Set<String>> server2UrlsShareChannel = new ConcurrentHashMap<>();
@Override
public Server createServer(URL url, MessageHandler messageHandler) {
synchronized (ipPort2ServerShareChannel) {
String ipPort = url.getServerPortStr();
String protocolKey = SummerFrameworkUtils.getProtocolKey(url);
boolean shareChannel = url.getBooleanParameter(URLParamType.shareChannel.getName(),
URLParamType.shareChannel.boolValue());
if (!shareChannel) { // 独享一个端口
logger.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url);
// 如果端口已经被使用了,使用该server bind 会有异常
return innerCreateServer(url, messageHandler);
}
logger.info(this.getClass().getSimpleName() + " create share_channel server: url={}", url);
Server server = ipPort2ServerShareChannel.get(ipPort);
if (server != null) {
// can't share service channel
if (!SummerFrameworkUtils.checkIfCanShareServiceChannel(server.getUrl(), url)) {
throw new SummerFrameworkException(
"Service export Error: share channel but some config param is different, " +
"protocol or codec or serialize or maxContentLength or maxServerConnection " +
"or maxWorkerThread or heartbeatFactory, source="
+ server.getUrl() + " target=" + url, SummerErrorMsgConstants.FRAMEWORK_EXPORT_ERROR);
}
saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);
return server;
}
url = url.createCopy();
url.setPath(""); // 共享server端口,由于有多个interfaces存在,所以把path设置为空
server = innerCreateServer(url, messageHandler);
ipPort2ServerShareChannel.put(ipPort, server);
saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);
return server;
}
}
@Override
public Client createClient(URL url) {
logger.info(this.getClass().getSimpleName() + " create client: url={}", url);
return innerCreateClient(url);
}
@Override
public void safeReleaseResource(Server server, URL url) {
boolean shareChannel = url.getBooleanParameter(URLParamType.shareChannel.getName(),
URLParamType.shareChannel.boolValue());
if (!shareChannel) {
destroy(server);
return;
}
synchronized (ipPort2ServerShareChannel) {
String ipPort = url.getServerPortStr();
String protocolKey = SummerFrameworkUtils.getProtocolKey(url);
if (server != ipPort2ServerShareChannel.get(ipPort)) {
destroy(server);
return;
}
Set<String> urls = server2UrlsShareChannel.get(server);
urls.remove(protocolKey);
if (urls.isEmpty()) {
destroy(server);
ipPort2ServerShareChannel.remove(ipPort);
server2UrlsShareChannel.remove(server);
}
}
}
@Override
public void safeReleaseResource(Client client, URL url) {
destroy(client);
}
private <T extends Endpoint> void destroy(T endpoint) {
endpoint.close();
}
protected abstract Server innerCreateServer(URL url, MessageHandler messageHandler);
protected abstract Client innerCreateClient(URL url);
private <T> void saveEndpoint2Urls(ConcurrentMap<T, Set<String>> map, T endpoint, String namespace) {
Set<String> sets = map.get(endpoint);
if (sets == null) {
sets = new HashSet<>();
sets.add(namespace);
// 规避并发问题,因为有release逻辑存在,所以这里的sets预先add了namespace
map.putIfAbsent(endpoint, sets);
sets = map.get(endpoint);
}
sets.add(namespace);
}
}
org.hongxi.summer.protocol.summer.SummerCodec
\ No newline at end of file
org.hongxi.summer.serialize.Hessian2Serialization
org.hongxi.summer.serialize.FastJsonSerialization
\ No newline at end of file
......@@ -9,43 +9,17 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>whatsmars-remoting</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>Transport of RPC</description>
<artifactId>summer-transport-netty</artifactId>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<groupId>org.hongxi</groupId>
<artifactId>summer-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>1.1.33.Fork26</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<optional>true</optional>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
......
package org.hongxi.summer.transport.netty;
import org.hongxi.summer.common.threadpool.DefaultThreadFactory;
import org.hongxi.summer.common.threadpool.StandardThreadPoolExecutor;
import org.hongxi.summer.transport.SharedObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by shenhongxi on 2020/7/30.
*/
public class NettyChannelFactory implements SharedObjectFactory<NettyChannel> {
private static final Logger logger = LoggerFactory.getLogger(NettyChannelFactory.class);
private static final ExecutorService rebuildExecutorService = new StandardThreadPoolExecutor(
5, 30, 10L, TimeUnit.SECONDS, 100,
new DefaultThreadFactory("RebuildExecutorService", true),
new ThreadPoolExecutor.CallerRunsPolicy());
private NettyClient nettyClient;
private String factoryName;
public NettyChannelFactory(NettyClient nettyClient) {
this.nettyClient = nettyClient;
this.factoryName = "NettyChannelFactory_" + nettyClient.getUrl().getHost() +
"_" + nettyClient.getUrl().getPort();
}
@Override
public NettyChannel makeObject() {
return new NettyChannel(nettyClient);
}
@Override
public boolean rebuildObject(NettyChannel nettyChannel, boolean async) {
ReentrantLock lock = nettyChannel.getLock();
if (lock.tryLock()) {
try {
if (!nettyChannel.isAvailable() && !nettyChannel.isReconnect()) {
nettyChannel.reconnect();
if (async) {
rebuildExecutorService.submit(new RebuildTask(nettyChannel));
} else {
nettyChannel.close();
nettyChannel.open();
logger.info("rebuild channel success: {}", nettyChannel.getUrl());
}
}
} catch (Exception e) {
logger.error("rebuild error: {}, {}", this.toString(), nettyChannel.getUrl(), e);
} finally {
lock.unlock();
}
return true;
}
return false;
}
@Override
public String toString() {
return factoryName;
}
class RebuildTask implements Runnable {
private NettyChannel channel;
public RebuildTask(NettyChannel channel) {
this.channel = channel;
}
@Override
public void run() {
try {
channel.getLock().lock();
channel.close();
channel.open();
logger.info("rebuild channel success: {}", channel.getUrl());
} catch (Exception e) {
logger.error("rebuild error: {}, {}", this.toString(), channel.getUrl(), e);
} finally {
channel.getLock().unlock();
}
}
}
}
package org.hongxi.summer.transport.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.hongxi.summer.CodecUtils;
import org.hongxi.summer.codec.Codec;
import org.hongxi.summer.common.SummerConstants;
import org.hongxi.summer.common.util.SummerFrameworkUtils;
import org.hongxi.summer.exception.SummerFrameworkException;
import org.hongxi.summer.exception.SummerServiceException;
import org.hongxi.summer.protocol.summer.SummerCodec;
import org.hongxi.summer.rpc.Response;
import org.hongxi.summer.transport.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Created by shenhongxi on 2020/7/6.
*/
public class NettyDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(NettyDecoder.class);
private Codec codec;
private Channel channel;
private int maxContentLength;
public NettyDecoder(Codec codec, Channel channel, int maxContentLength) {
this.codec = codec;
this.channel = channel;
this.maxContentLength = maxContentLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() <= SummerCodec.HEADER_LENGTH) {
return;
}
in.markReaderIndex();
short type = in.readShort();
if (type != SummerConstants.NETTY_MAGIC_TYPE) {
in.resetReaderIndex();
throw new SummerFrameworkException("NettyDecoder transport header not support, type: " + type);
}
in.skipBytes(1);
int rpcVersion = (in.readByte() & 0xff) >>> 3;
decodeV1(ctx, in, out);
}
private void decodeV1(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
long startTime = System.currentTimeMillis();
in.resetReaderIndex();
in.skipBytes(2);// skip magic num
byte messageType = (byte) in.readShort();
long requestId = in.readLong();
int dataLength = in.readInt();
boolean isRequest = messageType == SummerConstants.FLAG_REQUEST;
checkMaxContent(dataLength, ctx, in, isRequest, requestId);
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
NettyMessage message = new NettyMessage(isRequest, requestId, data);
out.add(message);
message.setStartTime(startTime);
}
private void checkMaxContent(int dataLength, ChannelHandlerContext ctx, ByteBuf byteBuf, boolean isRequest, long requestId) throws Exception {
if (maxContentLength > 0 && dataLength > maxContentLength) {
logger.warn("transport data content length over of limit, size: {} > {}. remote={} local={}",
dataLength, maxContentLength, ctx.channel().remoteAddress(), ctx.channel().localAddress());
// skip all readable Bytes in order to release this no-readable bytebuf in super.channelRead()
// that avoid this.decode() being invoked again after channel.close()
byteBuf.skipBytes(byteBuf.readableBytes());
Exception e = new SummerServiceException("NettyDecoder transport data content length over of limit, size: " + dataLength + " > " + maxContentLength);
if (isRequest) {
Response response = SummerFrameworkUtils.buildErrorResponse(requestId, e);
byte[] msg = CodecUtils.encodeObjectToBytes(channel, codec, response);
ctx.channel().writeAndFlush(msg);
}
throw e;
}
}
}
org.hongxi.summer.transport.netty.NettyEndpointFactory
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册