提交 f9e52853 编写于 作者: 白coderT's avatar 白coderT

Initial commit

上级
# maven ignore
target/
*.war
*.zip
*.tar
*.tar.gz
# eclipse ignore
.settings/
.project
.classpath
# idea ignore
.idea/
*.ipr
*.iml
*.iws
# temp ignore
*.log
*.logs
*.cache
*.diff
*.patch
*.tmp
*.java~
*.properties~
*.xml~
# system ignore
.DS_Store
Thumbs.db
\ No newline at end of file
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Copyright 2018-yyyy 冰河.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
================================================================
Dependencies:
Spring:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://www.springsource.org
Javassist:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://www.jboss.org/javassist
Netty:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://netty.io
Mina:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://mina.apache.org
Grizzly:
* LICENSE:
* http://www.gnu.org/licenses/gpl-2.0.html (General Public License 2.0)
* HOMEPAGE:
* http://grizzly.java.net
HttpClient:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://hc.apache.org
Hessian:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://hessian.caucho.com
XStream:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://xstream.codehaus.org
FastJson:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://code.alibabatech.com/wiki/fastjson
Zookeeper:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://zookeeper.apache.org
Jedis:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://code.google.com/p/jedis
XMemcached:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://code.google.com/p/xmemcached
Jetty:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://jetty.mortbay.org
Thrift:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://thrift.apache.org
CXF:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://cxf.apache.org
ZKClient:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* https://github.com/sgroschupf/zkclient
Curator
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* https://github.com/Netflix/curator
JFreeChart:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://www.jfree.org
HibernateValidator:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://www.hibernate.org/subprojects/validator.html
CommonsLogging:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://commons.apache.org/logging
SLF4J:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://www.slf4j.org
Log4j:
* LICENSE:
* http://www.apache.org/licenses/LICENSE-2.0 (Apache License 2.0)
* HOMEPAGE:
* http://log4j.apache.org
# 作者及联系方式
作者:冰河
微信:hacker_binghe
QQ:2711098650
微信公众号: 冰河技术
<div align="center">
<a href="https://github.com/binghe001/BingheGuide">关注冰河技术,解锁更多技能!</a>
</div>
## 加群交流
本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “冰河” 微信(hacker_binghe),备注:`学习加群`
<div align="center">
<img src="https://binghe001.github.io/images/personal/hacker_binghe.jpg?raw=true" width="180px">
<div style="font-size: 9px;">冰河微信</div>
<br/>
</div>
## 公众号
分享各种编程语言、开发技术、分布式与微服务架构、分布式数据库、分布式事务、云原生、大数据与云计算技术和渗透技术。另外,还会分享各种面试题和面试技巧。内容在 **冰河技术** 微信公众号首发,强烈建议大家关注。
<div align="center">
<img src="https://img-blog.csdnimg.cn/20210426115714643.jpg?raw=true" width="180px">
<div style="font-size: 9px;">公众号:冰河技术</div>
<br/>
</div>
## 星球
加入星球 **[冰河技术](http://m6z.cn/6aeFbs)**,可以获得本站点所有学习内容的指导与帮助。如果你遇到不能独立解决的问题,也可以添加冰河的微信:**hacker_binghe**, 我们一起沟通交流。另外,在星球中不只能学到实用的硬核技术,还能学习**实战项目**
关注 [冰河技术](https://img-blog.csdnimg.cn/20210426115714643.jpg?raw=true)公众号,回复 `星球` 可以获取入场优惠券。
<div align="center">
<img src="https://binghe001.github.io/images/personal/xingqiu.png?raw=true" width="180px">
<div style="font-size: 9px;">知识星球:冰河技术</div>
<br/>
</div>
## 我的新书
<div align="center">
<img src="https://img-blog.csdnimg.cn/fe76310aea734752b3b79c4df1438943.jpeg?raw=true" width="80%">
<div style="font-size: 9px;"><a href="https://item.jd.com/13190783.html">《深入理解高并发编程:核心原理与案例实战》</a></div>
<br/>
</div>
<div align="center">
<img src="https://img-blog.csdnimg.cn/5ee367b68023466a87f66763a64a4133.jpg?raw=true" width="100%">
<div style="font-size: 9px;"><a href="https://item.jd.com/12972343.html">《深入理解分布式事务:原理与实战》</a></div>
<br/>
</div>
<div align="center">
<img src="https://img-blog.csdnimg.cn/20210426115257555.png?raw=true" width="80%">
<div style="font-size: 9px;"><a href="https://item.jd.com/13036154.html">《MySQL技术大全:开发、优化与运维实战》</a></div>
<br/>
</div>
<div align="center">
<img src="https://img-blog.csdnimg.cn/20200828011209412.png?raw=true" width="80%">
<div style="font-size: 9px;"><a href="https://item.jd.com/12710993.html">《海量数据处理与大数据技术实战》</a></div>
<br/>
</div>
# 冰河整理PDF
关注 **冰河技术** 微信公众号:
回复 “**并发编程**” 领取《深入理解高并发编程(第1版)》PDF电子书。
回复 “**并发源码**” 领取《并发编程核心知识(源码分析篇 第1版)》PDF电子书。
回复 “**渗透笔记**” 领取《冰河的渗透实战笔记》PDF电子书。
回复 “**我要进大厂**” 领取《我要进大厂系列之面试圣经(第1版)》PDF电子书。
回复 ”**限流**“ 领取《亿级流量下的分布式解决方案》PDF电子书。
回复 “**设计模式**” 领取《深入浅出Java23种设计模式》PDF电子书。
回复 “**Java8新特性**” 领取 《Java8新特性教程》PDF电子书。
回复 “**分布式存储**” 领取《跟冰河学习分布式存储技术》 PDF电子书。
回复 “**Nginx**” 领取《跟冰河学习Nginx技术》PDF电子书。
回复 “**互联网工程**” 领取《跟冰河学习互联网工程技术》PDF电子书。
回复 “**mysql基础**” 领取《MySQL核心知识手册》PDF电子书。
回复 “**冰河索引**” 领取《冰河技术公号文章索引》PDF电子书。
回复 “**ngx2**” 领取《Nginx核心技术手册》PDF电子书。
# 项目介绍
bhrpc是一款分布式的、高性能、可扩展的RPC框架。
bhrpc框架支持如下功能:
1.注册中心的插件化动态扩展
2.序列化方式插件化动态扩展
3.动态代理插件化动态扩展
4.服务的动态注册与发现
5.参数验证、结果缓存
6.服务调用区分版本号(对标Dubbo)
7.超时重试
8.同步调用、异步调用、异步执行
9.异步回调
10.事件通知
11.SPI扩展技术
12.支持原生RPC
13.支持基于Spring(XML/注解)整合RPC
14.支持SpringBoot整合RPC
15.支持Docker整合RPC
16.支持K8S整合RPC
待扩展功能:
1.路由控制与管理
2.流控分析与管理
3.限流、熔断、降级
4.并发控制、连接控制、延迟连接
5.全链路监控
6.多SDK建设
7.其他暂时还未想到的扩展
# 文章合集
* [这次我设计了一款TPS百万级别的分布式、高性能、可扩展的RPC框架](https://mp.weixin.qq.com/s/5HOUg49X0xQmkQjMiWnaIg)
* [《RPC手撸专栏》第1章:开篇,从零开始手撸一个能在实际场景使用的高性能RPC框架](https://articles.zsxq.com/id_6gfgwev2uw0p.html)
* [《RPC手撸专栏》第2章:高性能分布式RPC框架整体架构设计](https://articles.zsxq.com/id_xvd5up1u16nx.html)
* [《RPC手撸专栏》第3章:RPC服务核心注解的设计与实现](https://articles.zsxq.com/id_zr6w6dvgdc95.html)
* [《RPC手撸专栏》第4章:实现RPC服务核心注解的扫描与解析](https://articles.zsxq.com/id_bal2cnmw3jbi.html)
* [《RPC手撸专栏》第5章:服务提供者收发消息基础功能实现](https://articles.zsxq.com/id_df11g9wmm8ad.html)
* [《RPC手撸专栏》第6章:自定义网络传输协议的实现](https://articles.zsxq.com/id_qgntsrvlljea.html)
* [《RPC手撸专栏》第7章:自定义网络编解码的实现](https://articles.zsxq.com/id_5wqs9pshkwep.html)
* [《RPC手撸专栏》第8章:模拟服务消费者与服务提供者之间的数据交互](https://articles.zsxq.com/id_cd0dpx73a4uk.html)
* [《RPC手撸专栏》第9章:服务提供者调用真实方法的实现](https://articles.zsxq.com/id_fwxlxhq2bthz.html)
* [《RPC手撸专栏》第10章:测试服务提供者调用真实方法](https://articles.zsxq.com/id_1uamc23nb7ws.html)
* [《RPC手撸专栏》第11章:服务提供者扩展支持CGLib调用真实方法](https://articles.zsxq.com/id_wjwsso804nip.html)
* [《RPC手撸专栏》第12章:实现服务消费者与服务提供者直接通信](https://articles.zsxq.com/id_wpfhwa85newn.html)
* [《RPC手撸专栏》第13章:服务消费者异步转同步直接获取返回结果](https://articles.zsxq.com/id_dfzr8j7xsn7e.html)
* [《RPC手撸专栏》第14章:服务消费者异步转同步的自定义Future与AQS实现](https://articles.zsxq.com/id_6v8wcbaaitg4.html)
* [《RPC手撸专栏》第15章:服务消费者同步、异步、单向调用的实现](https://articles.zsxq.com/id_oixsibgwepmu.html)
* [《RPC手撸专栏》第16章:服务消费者回调方法的实现](https://articles.zsxq.com/id_7nlqbj3z8rgw.html)
* [《RPC手撸专栏》第17章:服务消费者实现动态代理功能屏蔽构建请求协议对象的细节](https://articles.zsxq.com/id_bv0jkh6fhr1w.html)
* [《RPC手撸专栏》第18章:服务消费者整合动态代理实现直接调用接口返回结果数据](https://articles.zsxq.com/id_r6u3xt22wf67.html)
* [《RPC手撸专栏》第19章:服务消费者动态代理实现异步调用](https://articles.zsxq.com/id_ciz8m2zcl6nt.html)
* [《RPC手撸专栏》第20章:服务消费者动态代理扩展优化](https://articles.zsxq.com/id_o9c21x3zm7nb.html)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-annotation</artifactId>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.annotation;
import org.springframework.beans.factory.annotation.Autowired;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author binghe
* @version 1.0.0
* @description bhrpc服务消费者
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface RpcReference {
/**
* 版本号
*/
String version() default "1.0.0";
/**
* 注册中心类型, 目前的类型包含:zookeeper、nacos、etcd、consul
*/
String registryType() default "zookeeper";
/**
* 注册地址
*/
String registryAddress() default "127.0.0.1:2181";
/**
* 负载均衡类型,默认基于ZK的一致性Hash
*/
String loadBalanceType() default "zkconsistenthash";
/**
* 序列化类型,目前的类型包含:protostuff、kryo、json、jdk、hessian2、fst
*/
String serializationType() default "protostuff";
/**
* 超时时间,默认5s
*/
long timeout() default 5000;
/**
* 是否异步执行
*/
boolean async() default false;
/**
* 是否单向调用
*/
boolean oneway() default false;
/**
* 代理的类型,jdk:jdk代理, javassist: javassist代理, cglib: cglib代理
*/
String proxy() default "jdk";
/**
* 服务分组,默认为空
*/
String group() default "";
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.annotation;
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author binghe
* @version 1.0.0
* @description bhrpc服务提供者注解
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
/**
* 接口的Class
*/
Class<?> interfaceClass() default void.class;
/**
* 接口的ClassName
*/
String interfaceClassName() default "";
/**
* 版本号
*/
String version() default "1.0.0";
/**
* 服务分组,默认为空
*/
String group() default "";
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-codec</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-serialization-jdk</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.codec;
import io.binghe.rpc.serialization.api.Serialization;
import io.binghe.rpc.serialization.jdk.JdkSerialization;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description 实现编解码的接口,提供序列化和反序列化的默认方法
*/
public interface RpcCodec {
default Serialization getJdkSerialization(){
return new JdkSerialization();
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.codec;
import io.binghe.rpc.common.utils.SerializationUtils;
import io.binghe.rpc.constants.RpcConstants;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.enumeration.RpcType;
import io.binghe.rpc.protocol.header.RpcHeader;
import io.binghe.rpc.protocol.request.RpcRequest;
import io.binghe.rpc.protocol.response.RpcResponse;
import io.binghe.rpc.serialization.api.Serialization;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description 实现RPC解码操作
*/
public class RpcDecoder extends ByteToMessageDecoder implements RpcCodec {
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < RpcConstants.HEADER_TOTAL_LEN) {
return;
}
in.markReaderIndex();
short magic = in.readShort();
if (magic != RpcConstants.MAGIC) {
throw new IllegalArgumentException("magic number is illegal, " + magic);
}
byte msgType = in.readByte();
byte status = in.readByte();
long requestId = in.readLong();
ByteBuf serializationTypeByteBuf = in.readBytes(SerializationUtils.MAX_SERIALIZATION_TYPE_COUNR);
String serializationType = SerializationUtils.subString(serializationTypeByteBuf.toString(CharsetUtil.UTF_8));
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
RpcType msgTypeEnum = RpcType.findByType(msgType);
if (msgTypeEnum == null) {
return;
}
RpcHeader header = new RpcHeader();
header.setMagic(magic);
header.setStatus(status);
header.setRequestId(requestId);
header.setMsgType(msgType);
header.setSerializationType(serializationType);
header.setMsgLen(dataLength);
//TODO Serialization是扩展点
Serialization serialization = getJdkSerialization();
switch (msgTypeEnum) {
case REQUEST:
RpcRequest request = serialization.deserialize(data, RpcRequest.class);
if (request != null) {
RpcProtocol<RpcRequest> protocol = new RpcProtocol<>();
protocol.setHeader(header);
protocol.setBody(request);
out.add(protocol);
}
break;
case RESPONSE:
RpcResponse response = serialization.deserialize(data, RpcResponse.class);
if (response != null) {
RpcProtocol<RpcResponse> protocol = new RpcProtocol<>();
protocol.setHeader(header);
protocol.setBody(response);
out.add(protocol);
}
break;
case HEARTBEAT:
// TODO
break;
}
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.codec;
import io.binghe.rpc.common.utils.SerializationUtils;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.header.RpcHeader;
import io.binghe.rpc.serialization.api.Serialization;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description 实现RPC编码
*/
public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> implements RpcCodec {
@Override
protected void encode(ChannelHandlerContext ctx, RpcProtocol<Object> msg, ByteBuf byteBuf) throws Exception {
RpcHeader header = msg.getHeader();
byteBuf.writeShort(header.getMagic());
byteBuf.writeByte(header.getMsgType());
byteBuf.writeByte(header.getStatus());
byteBuf.writeLong(header.getRequestId());
String serializationType = header.getSerializationType();
//TODO Serialization是扩展点
Serialization serialization = getJdkSerialization();
byteBuf.writeBytes(SerializationUtils.paddingString(serializationType).getBytes("UTF-8"));
byte[] data = serialization.serialize(msg.getBody());
byteBuf.writeInt(data.length);
byteBuf.writeBytes(data);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-common</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-annotation</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.common.exception;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description SerializerException
*/
public class SerializerException extends RuntimeException {
private static final long serialVersionUID = -6783134254669118520L;
/**
* Instantiates a new Serializer exception.
*
* @param e the e
*/
public SerializerException(final Throwable e) {
super(e);
}
/**
* Instantiates a new Serializer exception.
*
* @param message the message
*/
public SerializerException(final String message) {
super(message);
}
/**
* Instantiates a new Serializer exception.
*
* @param message the message
* @param throwable the throwable
*/
public SerializerException(final String message, final Throwable throwable) {
super(message, throwable);
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.common.helper;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description RPC服务帮助类
*/
public class RpcServiceHelper {
/**
* 拼接字符串
* @param serviceName 服务名称
* @param serviceVersion 服务版本号
* @param group 服务分组
* @return 服务名称#服务版本号#服务分组
*/
public static String buildServiceKey(String serviceName, String serviceVersion, String group) {
return String.join("#", serviceName, serviceVersion, group);
}
}
package io.binghe.rpc.common.id;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author binghe
* @version 1.0.0
* @description 简易ID工厂类
*/
public class IdFactory {
private final static AtomicLong REQUEST_ID_GEN = new AtomicLong(0);
public static Long getId(){
return REQUEST_ID_GEN.incrementAndGet();
}
}
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.common.scanner;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.net.JarURLConnection;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
/**
* @author binghe
* @version 1.0.0
* @description 类扫描器
*/
public class ClassScanner {
/**
* 文件
*/
private static final String PROTOCOL_FILE = "file";
/**
* jar包
*/
private static final String PROTOCOL_JAR = "jar";
/**
* class文件的后缀
*/
private static final String CLASS_FILE_SUFFIX = ".class";
/**
* 扫描指定包下的所有类信息
* @param packageName 指定的包名
* @return 指定包下所有的完整类名的List集合
* @throws Exception
*/
public static List<String> getClassNameList(String packageName) throws Exception{
//第一个class类的集合
List<String> classNameList = new ArrayList<String>();
//是否循环迭代
boolean recursive = true;
//获取包的名字 并进行替换
String packageDirName = packageName.replace('.', '/');
//定义一个枚举的集合 并进行循环来处理这个目录下的things
Enumeration<URL> dirs = Thread.currentThread().getContextClassLoader().getResources(packageDirName);
//循环迭代下去
while (dirs.hasMoreElements()){
//获取下一个元素
URL url = dirs.nextElement();
//得到协议的名称
String protocol = url.getProtocol();
//如果是以文件的形式保存在服务器上
if (PROTOCOL_FILE.equals(protocol)) {
//获取包的物理路径
String filePath = URLDecoder.decode(url.getFile(), "UTF-8");
//以文件的方式扫描整个包下的文件 并添加到集合中
findAndAddClassesInPackageByFile(packageName, filePath, recursive, classNameList);
} else if (PROTOCOL_JAR.equals(protocol)){
packageName = findAndAddClassesInPackageByJar(packageName, classNameList, recursive, packageDirName, url);
}
}
return classNameList;
}
/**
* 扫描Jar文件中指定包下的所有类信息
* @param packageName 扫描的包名
* @param classNameList 完成类名存放的List集合
* @param recursive 是否递归调用
* @param packageDirName 当前包名的前面部分的名称
* @param url 包的url地址
* @return 处理后的包名,以供下次调用使用
* @throws IOException
*/
private static String findAndAddClassesInPackageByJar(String packageName, List<String> classNameList, boolean recursive, String packageDirName, URL url) throws IOException {
//如果是jar包文件
//定义一个JarFile
JarFile jar = ((JarURLConnection) url.openConnection()).getJarFile();
//从此jar包 得到一个枚举类
Enumeration<JarEntry> entries = jar.entries();
//同样的进行循环迭代
while (entries.hasMoreElements()) {
//获取jar里的一个实体 可以是目录 和一些jar包里的其他文件 如META-INF等文件
JarEntry entry = entries.nextElement();
String name = entry.getName();
//如果是以/开头的
if (name.charAt(0) == '/') {
//获取后面的字符串
name = name.substring(1);
}
//如果前半部分和定义的包名相同
if (name.startsWith(packageDirName)) {
int idx = name.lastIndexOf('/');
//如果以"/"结尾 是一个包
if (idx != -1) {
//获取包名 把"/"替换成"."
packageName = name.substring(0, idx).replace('/', '.');
}
//如果可以迭代下去 并且是一个包
if ((idx != -1) || recursive){
//如果是一个.class文件 而且不是目录
if (name.endsWith(CLASS_FILE_SUFFIX) && !entry.isDirectory()) {
//去掉后面的".class" 获取真正的类名
String className = name.substring(packageName.length() + 1, name.length() - 6);
classNameList.add(packageName + '.' + className);
}
}
}
}
return packageName;
}
/**
* 扫描当前工程中指定包下的所有类信息
* @param packageName 扫描的包名
* @param packagePath 包在磁盘上的完整路径
* @param recursive 是否递归调用
* @param classNameList 类名称的集合
*/
private static void findAndAddClassesInPackageByFile(String packageName, String packagePath, final boolean recursive, List<String> classNameList){
//获取此包的目录 建立一个File
File dir = new File(packagePath);
//如果不存在或者 也不是目录就直接返回
if (!dir.exists() || !dir.isDirectory()) {
return;
}
//如果存在 就获取包下的所有文件 包括目录
File[] dirfiles = dir.listFiles(new FileFilter() {
//自定义过滤规则 如果可以循环(包含子目录) 或则是以.class结尾的文件(编译好的java类文件)
public boolean accept(File file) {
return (recursive && file.isDirectory()) || (file.getName().endsWith(".class"));
}
});
//循环所有文件
for (File file : dirfiles) {
//如果是目录 则继续扫描
if (file.isDirectory()) {
findAndAddClassesInPackageByFile(packageName + "." + file.getName(),
file.getAbsolutePath(),
recursive,
classNameList);
}else {
//如果是java类文件 去掉后面的.class 只留下类名
String className = file.getName().substring(0, file.getName().length() - 6);
//添加到集合中去
classNameList.add(packageName + '.' + className);
}
}
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.common.scanner.reference;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author binghe
* @version 1.0.0
* @description 存在@RpcReference注解字段代理实例的上下文
*/
public class RpcReferenceContext {
private static volatile Map<String, Object> instance;
static {
instance = new ConcurrentHashMap<>();
}
public static void put(String key, Object value){
instance.put(key, value);
}
public static Object get(String key){
return instance.get(key);
}
public static Object remove(String key){
return instance.remove(key);
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.common.scanner.reference;
import io.binghe.rpc.annotation.RpcReference;
import io.binghe.rpc.common.scanner.ClassScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
/**
* @author binghe
* @version 1.0.0
* @description @RpcReference注解扫描器
*/
public class RpcReferenceScanner extends ClassScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcReferenceScanner.class);
/**
* 扫描指定包下的类,并筛选使用@RpcService注解标注的类
*/
public static Map<String, Object> doScannerWithRpcReferenceAnnotationFilter(/*String host, int port, */ String scanPackage/*, RegistryService registryService*/) throws Exception{
Map<String, Object> handlerMap = new HashMap<>();
List<String> classNameList = getClassNameList(scanPackage);
if (classNameList == null || classNameList.isEmpty()){
return handlerMap;
}
classNameList.stream().forEach((className) -> {
try {
Class<?> clazz = Class.forName(className);
Field[] declaredFields = clazz.getDeclaredFields();
Stream.of(declaredFields).forEach((field) -> {
RpcReference rpcReference = field.getAnnotation(RpcReference.class);
if (rpcReference != null){
//TODO 处理后续逻辑,将@RpcReference注解标注的接口引用代理对象,放入全局缓存中
LOGGER.info("当前标注了@RpcReference注解的字段名称===>>> " + field.getName());
LOGGER.info("@RpcReference注解上标注的属性信息如下:");
LOGGER.info("version===>>> " + rpcReference.version());
LOGGER.info("group===>>> " + rpcReference.group());
LOGGER.info("registryType===>>> " + rpcReference.registryType());
LOGGER.info("registryAddress===>>> " + rpcReference.registryAddress());
}
});
} catch (Exception e) {
LOGGER.error("scan classes throws exception: {}", e);
}
});
return handlerMap;
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.common.scanner.server;
import io.binghe.rpc.annotation.RpcService;
import io.binghe.rpc.common.helper.RpcServiceHelper;
import io.binghe.rpc.common.scanner.ClassScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author binghe
* @version 1.0.0
* @description @RpcService注解扫描器
*/
public class RpcServiceScanner extends ClassScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcServiceScanner.class);
/**
* 扫描指定包下的类,并筛选使用@RpcService注解标注的类
*/
public static Map<String, Object> doScannerWithRpcServiceAnnotationFilterAndRegistryService(String scanPackage) throws Exception{
Map<String, Object> handlerMap = new HashMap<>();
List<String> classNameList = getClassNameList(scanPackage);
if (classNameList == null || classNameList.isEmpty()){
return handlerMap;
}
classNameList.stream().forEach((className) -> {
try {
Class<?> clazz = Class.forName(className);
RpcService rpcService = clazz.getAnnotation(RpcService.class);
if (rpcService != null){
//优先使用interfaceClass, interfaceClass的name为空,再使用interfaceClassName
//TODO 后续逻辑向注册中心注册服务元数据,同时向handlerMap中记录标注了RpcService注解的类实例
//handlerMap中的Key先简单存储为serviceName+version+group,后续根据实际情况处理key
String serviceName = getServiceName(rpcService);
String key = RpcServiceHelper.buildServiceKey(serviceName, rpcService.version(), rpcService.group());
handlerMap.put(key, clazz.newInstance());
}
} catch (Exception e) {
LOGGER.error("scan classes throws exception: {}", e);
}
});
return handlerMap;
}
/**
* 获取serviceName
*/
private static String getServiceName(RpcService rpcService){
//优先使用interfaceClass
Class clazz = rpcService.interfaceClass();
if (clazz == void.class){
return rpcService.interfaceClassName();
}
String serviceName = clazz.getName();
if (serviceName == null || serviceName.trim().isEmpty()){
serviceName = rpcService.interfaceClassName();
}
return serviceName;
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.common.threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 服务消费者线程池
*/
public class ClientThreadPool {
private static ThreadPoolExecutor threadPoolExecutor;
static{
threadPoolExecutor = new ThreadPoolExecutor(16, 16, 600L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(65536));
}
public static void submit(Runnable task){
threadPoolExecutor.submit(task);
}
public static void shutdown() {
threadPoolExecutor.shutdown();
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.common.threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description 服务提供者端线程池
*/
public class ServerThreadPool {
private static ThreadPoolExecutor threadPoolExecutor;
static {
threadPoolExecutor = new ThreadPoolExecutor(16, 16, 600L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(65536));
}
public static void submit(Runnable task){
threadPoolExecutor.submit(task);
}
public static void shutdown(){
threadPoolExecutor.shutdown();
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.common.utils;
import java.util.stream.IntStream;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description 序列化时针对消息头序列化类型的操作
*/
public class SerializationUtils {
private static final String PADDING_STRING = "0";
/**
* 约定序列化类型最大长度为16
*/
public static final int MAX_SERIALIZATION_TYPE_COUNR = 16;
/**
* 为长度不足16的字符串后面补0
* @param str 原始字符串
* @return 补0后的字符串
*/
public static String paddingString(String str){
str = transNullToEmpty(str);
if (str.length() >= MAX_SERIALIZATION_TYPE_COUNR) return str;
int paddingCount = MAX_SERIALIZATION_TYPE_COUNR - str.length();
StringBuilder paddingString = new StringBuilder(str);
IntStream.range(0, paddingCount).forEach((i) -> {
paddingString.append(PADDING_STRING);
});
return paddingString.toString();
}
/**
* 字符串去0 操作
* @param str 原始字符串
* @return 去0后的字符串
*/
public static String subString(String str){
str = transNullToEmpty(str);
return str.replace(PADDING_STRING, "");
}
public static String transNullToEmpty(String str){
return str == null ? "" : str;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-constants</artifactId>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.constants;
/**
* @author binghe
* @version 1.0.0
* @description 常量类
*/
public class RpcConstants {
/**
* 消息头,固定32个字节
*/
public static final int HEADER_TOTAL_LEN = 32;
/**
* 魔数
*/
public static final short MAGIC = 0x10;
/**
* 版本号
*/
public static final byte VERSION = 0x1;
/**
* REFLECT_TYPE_JDK
*/
public static final String REFLECT_TYPE_JDK = "jdk";
/**
* REFLECT_TYPE_CGLIB
*/
public static final String REFLECT_TYPE_CGLIB = "cglib";
/**
* JDK动态代理
*/
public static final String PROXY_JDK = "jdk";
/**
* javassist动态代理
*/
public static final String PROXY_JAVASSIST = "javassist";
/**
* cglib动态代理
*/
public static final String PROXY_CGLIB = "cglib";
/**
* 初始化的方法
*/
public static final String INIT_METHOD_NAME = "init";
/**
* zookeeper
*/
public static final String REGISTRY_CENTER_ZOOKEEPER = "zookeeper";
/**
* nacos
*/
public static final String REGISTRY_CENTER_NACOS = "nacos";
/**
* apoll
*/
public static final String REGISTRY_CENTER_APOLL = "apoll";
/**
* etcd
*/
public static final String REGISTRY_CENTER_ETCD = "etcd";
/**
* eureka
*/
public static final String REGISTRY_CENTER_EUREKA = "eureka";
/**
* protostuff 序列化
*/
public static final String SERIALIZATION_PROTOSTUFF = "protostuff";
/**
* FST 序列化
*/
public static final String SERIALIZATION_FST = "fst";
/**
* hessian2 序列化
*/
public static final String SERIALIZATION_HESSIAN2 = "hessian2";
/**
* jdk 序列化
*/
public static final String SERIALIZATION_JDK = "jdk";
/**
* json 序列化
*/
public static final String SERIALIZATION_JSON = "json";
/**
* kryo 序列化
*/
public static final String SERIALIZATION_KRYO = "kryo";
/**
* 基于ZK的一致性Hash负载均衡
*/
public static final String SERVICE_LOAD_BALANCER_ZKCONSISTENTHASH = "zkconsistenthash";
public static void main(String[] args){
String str = "test0000000000000000";
System.out.println(str.replace("0", ""));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-consumer</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-consumer-common</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-codec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-proxy-jdk</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.consumer.common;
import io.binghe.rpc.common.threadpool.ClientThreadPool;
import io.binghe.rpc.consumer.common.handler.RpcConsumerHandler;
import io.binghe.rpc.consumer.common.initializer.RpcConsumerInitializer;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.request.RpcRequest;
import io.binghe.rpc.proxy.api.consumer.Consumer;
import io.binghe.rpc.proxy.api.future.RPCFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 服务消费者
*/
public class RpcConsumer implements Consumer {
private final Logger logger = LoggerFactory.getLogger(RpcConsumer.class);
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup;
private static volatile RpcConsumer instance;
private static Map<String, RpcConsumerHandler> handlerMap = new ConcurrentHashMap<>();
private RpcConsumer() {
bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup(4);
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new RpcConsumerInitializer());
}
public static RpcConsumer getInstance(){
if (instance == null){
synchronized (RpcConsumer.class){
if (instance == null){
instance = new RpcConsumer();
}
}
}
return instance;
}
public void close(){
eventLoopGroup.shutdownGracefully();
ClientThreadPool.shutdown();
}
//修改返回数据的类型
@Override
public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol) throws Exception {
//TODO 暂时写死,后续在引入注册中心时,从注册中心获取
String serviceAddress = "127.0.0.1";
int port = 27880;
String key = serviceAddress.concat("_").concat(String.valueOf(port));
RpcConsumerHandler handler = handlerMap.get(key);
//缓存中无RpcClientHandler
if (handler == null){
handler = getRpcConsumerHandler(serviceAddress, port);
handlerMap.put(key, handler);
}else if (!handler.getChannel().isActive()){ //缓存中存在RpcClientHandler,但不活跃
handler.close();
handler = getRpcConsumerHandler(serviceAddress, port);
handlerMap.put(key, handler);
}
RpcRequest request = protocol.getBody();
return handler.sendRequest(protocol, request.getAsync(), request.getOneway());
}
/**
* 创建连接并返回RpcClientHandler
*/
private RpcConsumerHandler getRpcConsumerHandler(String serviceAddress, int port) throws InterruptedException {
ChannelFuture channelFuture = bootstrap.connect(serviceAddress, port).sync();
channelFuture.addListener((ChannelFutureListener) listener -> {
if (channelFuture.isSuccess()) {
logger.info("connect rpc server {} on port {} success.", serviceAddress, port);
} else {
logger.error("connect rpc server {} on port {} failed.", serviceAddress, port);
channelFuture.cause().printStackTrace();
eventLoopGroup.shutdownGracefully();
}
});
return channelFuture.channel().pipeline().get(RpcConsumerHandler.class);
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.consumer.common.context;
import io.binghe.rpc.proxy.api.future.RPCFuture;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 保存RPC上下文
*/
public class RpcContext {
private RpcContext(){
}
/**
* RpcContext实例
*/
private static final RpcContext AGENT = new RpcContext();
/**
* 存放RPCFuture的InheritableThreadLocal
*/
private static final InheritableThreadLocal<RPCFuture> RPC_FUTURE_INHERITABLE_THREAD_LOCAL = new InheritableThreadLocal<>();
/**
* 获取上下文
* @return RPC服务的上下文信息
*/
public static RpcContext getContext(){
return AGENT;
}
/**
* 将RPCFuture保存到线程的上下文
* @param rpcFuture
*/
public void setRPCFuture(RPCFuture rpcFuture){
RPC_FUTURE_INHERITABLE_THREAD_LOCAL.set(rpcFuture);
}
/**
* 获取RPCFuture
*/
public RPCFuture getRPCFuture(){
return RPC_FUTURE_INHERITABLE_THREAD_LOCAL.get();
}
/**
* 移除RPCFuture
*/
public void removeRPCFuture(){
RPC_FUTURE_INHERITABLE_THREAD_LOCAL.remove();
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.consumer.common.handler;
import com.alibaba.fastjson.JSONObject;
import io.binghe.rpc.consumer.common.context.RpcContext;
import io.binghe.rpc.proxy.api.future.RPCFuture;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.header.RpcHeader;
import io.binghe.rpc.protocol.request.RpcRequest;
import io.binghe.rpc.protocol.response.RpcResponse;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description RPC消费者处理器
*/
public class RpcConsumerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {
private final Logger logger = LoggerFactory.getLogger(RpcConsumerHandler.class);
private volatile Channel channel;
private SocketAddress remotePeer;
//存储请求ID与RpcResponse协议的映射关系
//private Map<Long, RpcProtocol<RpcResponse>> pendingResponse = new ConcurrentHashMap<>();
private Map<Long, RPCFuture> pendingRPC = new ConcurrentHashMap<>();
public Channel getChannel() {
return channel;
}
public SocketAddress getRemotePeer() {
return remotePeer;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.remotePeer = this.channel.remoteAddress();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
this.channel = ctx.channel();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcProtocol<RpcResponse> protocol) throws Exception {
if (protocol == null){
return;
}
logger.info("服务消费者接收到的数据===>>>{}", JSONObject.toJSONString(protocol));
RpcHeader header = protocol.getHeader();
long requestId = header.getRequestId();
RPCFuture rpcFuture = pendingRPC.remove(requestId);
if (rpcFuture != null){
rpcFuture.done(protocol);
}
}
/**
* 服务消费者向服务提供者发送请求
*/
public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol, boolean async, boolean oneway){
logger.info("服务消费者发送的数据===>>>{}", JSONObject.toJSONString(protocol));
return oneway ? this.sendRequestOneway(protocol) : async ? sendRequestAsync(protocol) : this.sendRequestSync(protocol);
}
private RPCFuture sendRequestSync(RpcProtocol<RpcRequest> protocol) {
RPCFuture rpcFuture = this.getRpcFuture(protocol);
channel.writeAndFlush(protocol);
return rpcFuture;
}
private RPCFuture sendRequestAsync(RpcProtocol<RpcRequest> protocol) {
RPCFuture rpcFuture = this.getRpcFuture(protocol);
//如果是异步调用,则将RPCFuture放入RpcContext
RpcContext.getContext().setRPCFuture(rpcFuture);
channel.writeAndFlush(protocol);
return null;
}
private RPCFuture sendRequestOneway(RpcProtocol<RpcRequest> protocol) {
channel.writeAndFlush(protocol);
return null;
}
private RPCFuture getRpcFuture(RpcProtocol<RpcRequest> protocol) {
RPCFuture rpcFuture = new RPCFuture(protocol);
RpcHeader header = protocol.getHeader();
long requestId = header.getRequestId();
pendingRPC.put(requestId, rpcFuture);
return rpcFuture;
}
public void close() {
channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.consumer.common.initializer;
import io.binghe.rpc.codec.RpcDecoder;
import io.binghe.rpc.codec.RpcEncoder;
import io.binghe.rpc.consumer.common.handler.RpcConsumerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description RpcConsumerInitializer
*/
public class RpcConsumerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline cp = channel.pipeline();
cp.addLast(new RpcEncoder());
cp.addLast(new RpcDecoder());
cp.addLast(new RpcConsumerHandler());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-consumer</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-consumer-native</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-consumer-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.consumer;
import io.binghe.rpc.consumer.common.RpcConsumer;
import io.binghe.rpc.proxy.api.ProxyFactory;
import io.binghe.rpc.proxy.api.async.IAsyncObjectProxy;
import io.binghe.rpc.proxy.api.config.ProxyConfig;
import io.binghe.rpc.proxy.api.object.ObjectProxy;
import io.binghe.rpc.proxy.jdk.JdkProxyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 服务消费客户端
*/
public class RpcClient {
private final Logger logger = LoggerFactory.getLogger(RpcClient.class);
/**
* 服务版本
*/
private String serviceVersion;
/**
* 服务分组
*/
private String serviceGroup;
/**
* 序列化类型
*/
private String serializationType;
/**
* 超时时间
*/
private long timeout;
/**
* 是否异步调用
*/
private boolean async;
/**
* 是否单向调用
*/
private boolean oneway;
public RpcClient(String serviceVersion, String serviceGroup, String serializationType, long timeout, boolean async, boolean oneway) {
this.serviceVersion = serviceVersion;
this.timeout = timeout;
this.serviceGroup = serviceGroup;
this.serializationType = serializationType;
this.async = async;
this.oneway = oneway;
}
public <T> T create(Class<T> interfaceClass) {
ProxyFactory proxyFactory = new JdkProxyFactory<T>();
proxyFactory.init(new ProxyConfig(interfaceClass, serviceVersion, serviceGroup, serializationType, timeout, RpcConsumer.getInstance(), async, oneway));
return proxyFactory.getProxy(interfaceClass);
}
public <T> IAsyncObjectProxy createAsync(Class<T> interfaceClass) {
return new ObjectProxy<T>(interfaceClass, serviceVersion, serviceGroup, serializationType, timeout, RpcConsumer.getInstance(), async, oneway);
}
public void shutdown() {
RpcConsumer.getInstance().close();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-consumer</artifactId>
<packaging>pom</packaging>
<modules>
<module>bhrpc-consumer-common</module>
<module>bhrpc-consumer-native</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-protocol</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-constants</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package io.binghe.rpc.protocol;
import io.binghe.rpc.protocol.header.RpcHeader;
import java.io.Serializable;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description Rpc协议
*/
public class RpcProtocol<T> implements Serializable {
private static final long serialVersionUID = 292789485166173277L;
/**
* 消息头
*/
private RpcHeader header;
/**
* 消息体
*/
private T body;
public RpcHeader getHeader() {
return header;
}
public void setHeader(RpcHeader header) {
this.header = header;
}
public T getBody() {
return body;
}
public void setBody(T body) {
this.body = body;
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.protocol.base;
import java.io.Serializable;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 消息体基础类
*/
public class RpcMessage implements Serializable {
/**
* 是否单向发送
*/
private boolean oneway;
/**
* 是否异步调用
*/
private boolean async;
public boolean getOneway() {
return oneway;
}
public void setOneway(boolean oneway) {
this.oneway = oneway;
}
public boolean getAsync() {
return async;
}
public void setAsync(boolean async) {
this.async = async;
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.protocol.enumeration;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description RPC服务状态
*/
public enum RpcStatus {
SUCCESS(0),
FAIL(1);
private final int code;
RpcStatus(int code) {
this.code = code;
}
public int getCode() {
return code;
}
}
package io.binghe.rpc.protocol.enumeration;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 协议的类型
*/
public enum RpcType {
//请求消息
REQUEST(1),
//响应消息
RESPONSE(2),
//心跳数据
HEARTBEAT(3);
private final int type;
RpcType(int type) {
this.type = type;
}
public static RpcType findByType(int type) {
for (RpcType rpcType : RpcType.values()) {
if (rpcType.getType() == type) {
return rpcType;
}
}
return null;
}
public int getType() {
return type;
}
}
\ No newline at end of file
package io.binghe.rpc.protocol.header;
import java.io.Serializable;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 消息头,目前固定为32个字节
*/
public class RpcHeader implements Serializable {
private static final long serialVersionUID = 6011436680686290298L;
/*
+---------------------------------------------------------------+
| 魔数 2byte | 报文类型 1byte | 状态 1byte | 消息 ID 8byte |
+---------------------------------------------------------------+
| 序列化类型 16byte | 数据长度 4byte |
+---------------------------------------------------------------+
*/
/**
* 魔数 2字节
*/
private short magic;
/**
* 报文类型 1字节
*/
private byte msgType;
/**
* 状态 1字节
*/
private byte status;
/**
* 消息 ID 8字节
*/
private long requestId;
/**
* 序列化类型16字节,不足16字节后面补0,约定序列化类型长度最多不能超过16
*/
private String serializationType;
/**
* 消息长度 4字节
*/
private int msgLen;
public short getMagic() {
return magic;
}
public void setMagic(short magic) {
this.magic = magic;
}
public byte getMsgType() {
return msgType;
}
public void setMsgType(byte msgType) {
this.msgType = msgType;
}
public byte getStatus() {
return status;
}
public void setStatus(byte status) {
this.status = status;
}
public String getSerializationType() {
return serializationType;
}
public void setSerializationType(String serializationType) {
this.serializationType = serializationType;
}
public long getRequestId() {
return requestId;
}
public void setRequestId(long requestId) {
this.requestId = requestId;
}
public int getMsgLen() {
return msgLen;
}
public void setMsgLen(int msgLen) {
this.msgLen = msgLen;
}
}
\ No newline at end of file
package io.binghe.rpc.protocol.header;
import io.binghe.rpc.common.id.IdFactory;
import io.binghe.rpc.constants.RpcConstants;
import io.binghe.rpc.protocol.enumeration.RpcType;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description RpcHeaderFactory
*/
public class RpcHeaderFactory {
public static RpcHeader getRequestHeader(String serializationType){
RpcHeader header = new RpcHeader();
long requestId = IdFactory.getId();
header.setMagic(RpcConstants.MAGIC);
header.setRequestId(requestId);
header.setMsgType((byte) RpcType.REQUEST.getType());
header.setStatus((byte) 0x1);
header.setSerializationType(serializationType);
return header;
}
}
\ No newline at end of file
package io.binghe.rpc.protocol.request;
import io.binghe.rpc.protocol.base.RpcMessage;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description Rpc请求封装类,对应的请求id在消息头中
*/
public class RpcRequest extends RpcMessage {
private static final long serialVersionUID = 5555776886650396129L;
/**
* 类名称
*/
private String className;
/**
* 方法名称
*/
private String methodName;
/**
* 参数类型数组
*/
private Class<?>[] parameterTypes;
/**
* 参数数组
*/
private Object[] parameters;
/**
* 版本号
*/
private String version;
/**
* 服务分组
*/
private String group;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
}
package io.binghe.rpc.protocol.response;
import io.binghe.rpc.protocol.base.RpcMessage;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description RPC的响应类,对应的请求id在响应头中
*/
public class RpcResponse extends RpcMessage {
private static final long serialVersionUID = 425335064405584525L;
private String error;
private Object result;
public boolean isError() {
return error != null;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.header.RpcHeader;
import io.binghe.rpc.protocol.header.RpcHeaderFactory;
import io.binghe.rpc.protocol.request.RpcRequest;
/**
* @author binghe(公众号 : 冰河技术)
* @version 1.0.0
* @description
*/
public class Test {
public static RpcProtocol<RpcRequest> getRpcProtocol(){
RpcHeader header = RpcHeaderFactory.getRequestHeader("jdk");
RpcRequest body = new RpcRequest();
body.setOneway(false);
body.setAsync(false);
body.setClassName("io.binghe.rpc.demo.RpcProtocol");
body.setMethodName("hello");
body.setGroup("binghe");
body.setParameters(new Object[]{"binghe"});
body.setParameterTypes(new Class[]{String.class});
body.setVersion("1.0.0");
RpcProtocol<RpcRequest> protocol = new RpcProtocol<>();
protocol.setBody(body);
protocol.setHeader(header);
return protocol;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-provider</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-provider-common</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-codec</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.provider.common.handler;
import io.binghe.rpc.common.helper.RpcServiceHelper;
import io.binghe.rpc.common.threadpool.ServerThreadPool;
import io.binghe.rpc.constants.RpcConstants;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.enumeration.RpcStatus;
import io.binghe.rpc.protocol.enumeration.RpcType;
import io.binghe.rpc.protocol.header.RpcHeader;
import io.binghe.rpc.protocol.request.RpcRequest;
import io.binghe.rpc.protocol.response.RpcResponse;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import net.sf.cglib.reflect.FastClass;
import net.sf.cglib.reflect.FastMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.Map;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description RPC服务提供者的Handler处理类
*/
public class RpcProviderHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
private final Logger logger = LoggerFactory.getLogger(RpcProviderHandler.class);
//存储服务名称#版本号#分组与对象实例的映射关系
private final Map<String, Object> handlerMap;
//调用采用哪种类型调用真实方法
private final String reflectType;
public RpcProviderHandler(String reflectType, Map<String, Object> handlerMap){
this.reflectType = reflectType;
this.handlerMap = handlerMap;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol) throws Exception {
ServerThreadPool.submit(() -> {
RpcHeader header = protocol.getHeader();
header.setMsgType((byte) RpcType.RESPONSE.getType());
RpcRequest request = protocol.getBody();
logger.debug("Receive request " + header.getRequestId());
RpcProtocol<RpcResponse> responseRpcProtocol = new RpcProtocol<RpcResponse>();
RpcResponse response = new RpcResponse();
try {
Object result = handle(request);
response.setResult(result);
response.setAsync(request.getAsync());
response.setOneway(request.getOneway());
header.setStatus((byte) RpcStatus.SUCCESS.getCode());
} catch (Throwable t) {
response.setError(t.toString());
header.setStatus((byte) RpcStatus.FAIL.getCode());
logger.error("RPC Server handle request error",t);
}
responseRpcProtocol.setHeader(header);
responseRpcProtocol.setBody(response);
ctx.writeAndFlush(responseRpcProtocol).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
logger.debug("Send response for request " + header.getRequestId());
}
});
});
}
private Object handle(RpcRequest request) throws Throwable {
String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getVersion(), request.getGroup());
Object serviceBean = handlerMap.get(serviceKey);
if (serviceBean == null) {
throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName()));
}
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
logger.debug(serviceClass.getName());
logger.debug(methodName);
if (parameterTypes != null && parameterTypes.length > 0){
for (int i = 0; i < parameterTypes.length; ++i) {
logger.debug(parameterTypes[i].getName());
}
}
if (parameters != null && parameters.length > 0){
for (int i = 0; i < parameters.length; ++i) {
logger.debug(parameters[i].toString());
}
}
return invokeMethod(serviceBean, serviceClass, methodName, parameterTypes, parameters);
}
private Object invokeMethod(Object serviceBean, Class<?> serviceClass, String methodName, Class<?>[] parameterTypes, Object[] parameters) throws Throwable {
switch (this.reflectType){
case RpcConstants.REFLECT_TYPE_JDK:
return this.invokeJDKMethod(serviceBean, serviceClass, methodName, parameterTypes, parameters);
case RpcConstants.REFLECT_TYPE_CGLIB:
return this.invokeCGLibMethod(serviceBean, serviceClass, methodName, parameterTypes, parameters);
default:
throw new IllegalArgumentException("not support reflect type");
}
}
/**
* CGLib代理方式
*/
private Object invokeCGLibMethod(Object serviceBean, Class<?> serviceClass, String methodName, Class<?>[] parameterTypes, Object[] parameters) throws Throwable {
// Cglib reflect
logger.info("use cglib reflect type invoke method...");
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);
}
/**
* JDK代理方式
*/
private Object invokeJDKMethod(Object serviceBean, Class<?> serviceClass, String methodName, Class<?>[] parameterTypes, Object[] parameters) throws Throwable {
// JDK reflect
logger.info("use jdk reflect type invoke method...");
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
return method.invoke(serviceBean, parameters);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("server caught exception", cause);
ctx.close();
}
}
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.provider.common.server.api;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 启动RPC服务的接口
*/
public interface Server {
/**
* 启动Netty服务
*/
void startNettyServer();
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.provider.common.server.base;
import io.binghe.rpc.codec.RpcDecoder;
import io.binghe.rpc.codec.RpcEncoder;
import io.binghe.rpc.provider.common.handler.RpcProviderHandler;
import io.binghe.rpc.provider.common.server.api.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 基础服务
*/
public class BaseServer implements Server {
private final Logger logger = LoggerFactory.getLogger(BaseServer.class);
//主机域名或者IP地址
protected String host = "127.0.0.1";
//端口号
protected int port = 27110;
//存储的是实体类关系
protected Map<String, Object> handlerMap = new HashMap<>();
private String reflectType;
public BaseServer(String serverAddress, String reflectType){
if (!StringUtils.isEmpty(serverAddress)){
String[] serverArray = serverAddress.split(":");
this.host = serverArray[0];
this.port = Integer.parseInt(serverArray[1]);
}
this.reflectType = reflectType;
}
@Override
public void startNettyServer() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new RpcDecoder())
.addLast(new RpcEncoder())
.addLast(new RpcProviderHandler(reflectType, handlerMap));
}
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(host, port).sync();
logger.info("Server started on {}:{}", host, port);
future.channel().closeFuture().sync();
}catch (Exception e){
logger.error("RPC Server start error", e);
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-provider</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-provider-native</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-provider-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.provider;
import io.binghe.rpc.common.scanner.server.RpcServiceScanner;
import io.binghe.rpc.provider.common.server.base.BaseServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 以Java原生方式启动启动Rpc
*/
public class RpcSingleServer extends BaseServer {
private final Logger logger = LoggerFactory.getLogger(RpcSingleServer.class);
public RpcSingleServer(String serverAddress, String scanPackage, String reflectType) {
//调用父类构造方法
super(serverAddress, reflectType);
try {
this.handlerMap = RpcServiceScanner.doScannerWithRpcServiceAnnotationFilterAndRegistryService(scanPackage);
} catch (Exception e) {
logger.error("RPC Server init error", e);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-provider</artifactId>
<packaging>pom</packaging>
<modules>
<module>bhrpc-provider-common</module>
<module>bhrpc-provider-native</module>
</modules>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-proxy</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-proxy-api</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-protocol</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.proxy.api;
import io.binghe.rpc.proxy.api.object.ObjectProxy;
import io.binghe.rpc.proxy.api.config.ProxyConfig;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 基础代理工厂类
*/
public abstract class BaseProxyFactory<T> implements ProxyFactory {
protected ObjectProxy<T> objectProxy;
@Override
public <T> void init(ProxyConfig<T> proxyConfig) {
this.objectProxy = new ObjectProxy(proxyConfig.getClazz(),
proxyConfig.getServiceVersion(),
proxyConfig.getServiceGroup(),
proxyConfig.getSerializationType(),
proxyConfig.getTimeout(),
proxyConfig.getConsumer(),
proxyConfig.getAsync(),
proxyConfig.getOneway());
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.proxy.api;
import io.binghe.rpc.proxy.api.config.ProxyConfig;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 代理工厂接口
*/
public interface ProxyFactory {
/**
* 获取代理对象
*/
<T> T getProxy(Class<T> clazz);
/**
* 默认初始化方法
*/
default <T> void init(ProxyConfig<T> proxyConfig){}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.proxy.api.async;
import io.binghe.rpc.proxy.api.future.RPCFuture;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 异步访问接口
*/
public interface IAsyncObjectProxy {
/**
* 异步代理对象调用方法
* @param funcName 方法名称
* @param args 方法参数
* @return 封装好的RPCFuture对象
*/
RPCFuture call(String funcName, Object... args);
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.proxy.api.callback;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 异步回调接口
*/
public interface AsyncRPCCallback {
/**
* 成功后的回调方法
*/
void onSuccess(Object result);
/**
* 异常的回调方法
*/
void onException(Exception e);
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.proxy.api.config;
import io.binghe.rpc.proxy.api.consumer.Consumer;
import java.io.Serializable;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 代理配置类
*/
public class ProxyConfig<T> implements Serializable {
private static final long serialVersionUID = 6648940252795742398L;
/**
* 接口的Class实例
*/
private Class<T> clazz;
/**
* 服务版本号
*/
private String serviceVersion;
/**
* 服务分组
*/
private String serviceGroup;
/**
* 超时时间
*/
private long timeout;
/**
* 消费者接口
*/
private Consumer consumer;
/**
* 序列化类型
*/
private String serializationType;
/**
* 是否异步调用
*/
private boolean async;
/**
* 是否单向调用
*/
private boolean oneway;
public ProxyConfig() {
}
public ProxyConfig(Class<T> clazz, String serviceVersion, String serviceGroup, String serializationType, long timeout, Consumer consumer, boolean async, boolean oneway) {
this.clazz = clazz;
this.serviceVersion = serviceVersion;
this.serviceGroup = serviceGroup;
this.timeout = timeout;
this.consumer = consumer;
this.serializationType = serializationType;
this.async = async;
this.oneway = oneway;
}
public Class<T> getClazz() {
return clazz;
}
public void setClazz(Class<T> clazz) {
this.clazz = clazz;
}
public String getServiceVersion() {
return serviceVersion;
}
public void setServiceVersion(String serviceVersion) {
this.serviceVersion = serviceVersion;
}
public String getServiceGroup() {
return serviceGroup;
}
public void setServiceGroup(String serviceGroup) {
this.serviceGroup = serviceGroup;
}
public long getTimeout() {
return timeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public String getSerializationType() {
return serializationType;
}
public void setSerializationType(String serializationType) {
this.serializationType = serializationType;
}
public boolean getAsync() {
return async;
}
public void setAsync(boolean async) {
this.async = async;
}
public boolean getOneway() {
return oneway;
}
public void setOneway(boolean oneway) {
this.oneway = oneway;
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.proxy.api.consumer;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.request.RpcRequest;
import io.binghe.rpc.proxy.api.future.RPCFuture;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 服务消费者
*/
public interface Consumer {
/**
* 消费者发送 request 请求
*/
RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol) throws Exception;
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.proxy.api.future;
import io.binghe.rpc.common.threadpool.ClientThreadPool;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.request.RpcRequest;
import io.binghe.rpc.protocol.response.RpcResponse;
import io.binghe.rpc.proxy.api.callback.AsyncRPCCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description RPC框架获取异步结果的自定义Future
*/
public class RPCFuture extends CompletableFuture<Object> {
private static final Logger LOGGER = LoggerFactory.getLogger(RPCFuture.class);
private Sync sync;
private RpcProtocol<RpcRequest> requestRpcProtocol;
private RpcProtocol<RpcResponse> responseRpcProtocol;
private long startTime;
private long responseTimeThreshold = 5000;
private List<AsyncRPCCallback> pendingCallbacks = new ArrayList<AsyncRPCCallback>();
private ReentrantLock lock = new ReentrantLock();
public RPCFuture(RpcProtocol<RpcRequest> requestRpcProtocol) {
this.sync = new Sync();
this.requestRpcProtocol = requestRpcProtocol;
this.startTime = System.currentTimeMillis();
}
@Override
public boolean isDone() {
return sync.isDone();
}
@Override
public Object get() throws InterruptedException, ExecutionException {
sync.acquire(-1);
if (this.responseRpcProtocol != null) {
return this.responseRpcProtocol.getBody().getResult();
} else {
return null;
}
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
boolean success = sync.tryAcquireNanos(-1, unit.toNanos(timeout));
if (success) {
if (this.responseRpcProtocol != null) {
return this.responseRpcProtocol.getBody().getResult();
} else {
return null;
}
} else {
throw new RuntimeException("Timeout exception. Request id: " + this.requestRpcProtocol.getHeader().getRequestId()
+ ". Request class name: " + this.requestRpcProtocol.getBody().getClassName()
+ ". Request method: " + this.requestRpcProtocol.getBody().getMethodName());
}
}
@Override
public boolean isCancelled() {
throw new UnsupportedOperationException();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
public void done(RpcProtocol<RpcResponse> responseRpcProtocol) {
this.responseRpcProtocol = responseRpcProtocol;
sync.release(1);
invokeCallbacks();
// Threshold
long responseTime = System.currentTimeMillis() - startTime;
if (responseTime > this.responseTimeThreshold) {
LOGGER.warn("Service response time is too slow. Request id = " + responseRpcProtocol.getHeader().getRequestId() + ". Response Time = " + responseTime + "ms");
}
}
private void invokeCallbacks() {
lock.lock();
try {
for (final AsyncRPCCallback callback : pendingCallbacks) {
runCallback(callback);
}
} finally {
lock.unlock();
}
}
public RPCFuture addCallback(AsyncRPCCallback callback) {
lock.lock();
try {
if (isDone()) {
runCallback(callback);
} else {
this.pendingCallbacks.add(callback);
}
} finally {
lock.unlock();
}
return this;
}
private void runCallback(final AsyncRPCCallback callback) {
final RpcResponse res = this.responseRpcProtocol.getBody();
ClientThreadPool.submit(() -> {
if (!res.isError()) {
callback.onSuccess(res.getResult());
} else {
callback.onException(new RuntimeException("Response error", new Throwable(res.getError())));
}
});
}
static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
//future status
private final int done = 1;
private final int pending = 0;
protected boolean tryAcquire(int acquires) {
return getState() == done;
}
protected boolean tryRelease(int releases) {
if (getState() == pending) {
if (compareAndSetState(pending, done)) {
return true;
}
}
return false;
}
public boolean isDone() {
getState();
return getState() == done;
}
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.proxy.api.object;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.header.RpcHeaderFactory;
import io.binghe.rpc.protocol.request.RpcRequest;
import io.binghe.rpc.proxy.api.async.IAsyncObjectProxy;
import io.binghe.rpc.proxy.api.consumer.Consumer;
import io.binghe.rpc.proxy.api.future.RPCFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 对象代理类
*/
public class ObjectProxy <T> implements IAsyncObjectProxy, InvocationHandler{
private static final Logger LOGGER = LoggerFactory.getLogger(ObjectProxy.class);
/**
* 接口的Class对象
*/
private Class<T> clazz;
/**
* 服务版本号
*/
private String serviceVersion;
/**
* 服务分组
*/
private String serviceGroup;
/**
* 超时时间,默认15s
*/
private long timeout = 15000;
/**
* 服务消费者
*/
private Consumer consumer;
/**
* 序列化类型
*/
private String serializationType;
/**
* 是否异步调用
*/
private boolean async;
/**
* 是否单向调用
*/
private boolean oneway;
public ObjectProxy(Class<T> clazz) {
this.clazz = clazz;
}
public ObjectProxy(Class<T> clazz, String serviceVersion, String serviceGroup, String serializationType, long timeout, Consumer consumer, boolean async, boolean oneway) {
this.clazz = clazz;
this.serviceVersion = serviceVersion;
this.timeout = timeout;
this.serviceGroup = serviceGroup;
this.consumer = consumer;
this.serializationType = serializationType;
this.async = async;
this.oneway = oneway;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class == method.getDeclaringClass()) {
String name = method.getName();
if ("equals".equals(name)) {
return proxy == args[0];
} else if ("hashCode".equals(name)) {
return System.identityHashCode(proxy);
} else if ("toString".equals(name)) {
return proxy.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
} else {
throw new IllegalStateException(String.valueOf(method));
}
}
RpcProtocol<RpcRequest> requestRpcProtocol = new RpcProtocol<RpcRequest>();
requestRpcProtocol.setHeader(RpcHeaderFactory.getRequestHeader(serializationType));
RpcRequest request = new RpcRequest();
request.setVersion(this.serviceVersion);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setGroup(this.serviceGroup);
request.setParameters(args);
request.setAsync(async);
request.setOneway(oneway);
requestRpcProtocol.setBody(request);
// Debug
LOGGER.debug(method.getDeclaringClass().getName());
LOGGER.debug(method.getName());
if (method.getParameterTypes() != null && method.getParameterTypes().length > 0){
for (int i = 0; i < method.getParameterTypes().length; ++i) {
LOGGER.debug(method.getParameterTypes()[i].getName());
}
}
if (args != null && args.length > 0){
for (int i = 0; i < args.length; ++i) {
LOGGER.debug(args[i].toString());
}
}
RPCFuture rpcFuture = this.consumer.sendRequest(requestRpcProtocol);
return rpcFuture == null ? null : timeout > 0 ? rpcFuture.get(timeout, TimeUnit.MILLISECONDS) : rpcFuture.get();
}
@Override
public RPCFuture call(String funcName, Object... args) {
RpcProtocol<RpcRequest> request = createRequest(this.clazz.getName(), funcName, args);
RPCFuture rpcFuture = null;
try {
rpcFuture = this.consumer.sendRequest(request);
} catch (Exception e) {
LOGGER.error("async all throws exception:{}", e);
}
return rpcFuture;
}
private RpcProtocol<RpcRequest> createRequest(String className, String methodName, Object[] args) {
RpcProtocol<RpcRequest> requestRpcProtocol = new RpcProtocol<RpcRequest>();
requestRpcProtocol.setHeader(RpcHeaderFactory.getRequestHeader(serializationType));
RpcRequest request = new RpcRequest();
request.setClassName(className);
request.setMethodName(methodName);
request.setParameters(args);
request.setVersion(this.serviceVersion);
request.setGroup(this.serviceGroup);
Class[] parameterTypes = new Class[args.length];
// Get the right class type
for (int i = 0; i < args.length; i++) {
parameterTypes[i] = getClassType(args[i]);
}
request.setParameterTypes(parameterTypes);
requestRpcProtocol.setBody(request);
LOGGER.debug(className);
LOGGER.debug(methodName);
for (int i = 0; i < parameterTypes.length; ++i) {
LOGGER.debug(parameterTypes[i].getName());
}
for (int i = 0; i < args.length; ++i) {
LOGGER.debug(args[i].toString());
}
return requestRpcProtocol;
}
private Class<?> getClassType(Object obj){
Class<?> classType = obj.getClass();
String typeName = classType.getName();
switch (typeName){
case "java.lang.Integer":
return Integer.TYPE;
case "java.lang.Long":
return Long.TYPE;
case "java.lang.Float":
return Float.TYPE;
case "java.lang.Double":
return Double.TYPE;
case "java.lang.Character":
return Character.TYPE;
case "java.lang.Boolean":
return Boolean.TYPE;
case "java.lang.Short":
return Short.TYPE;
case "java.lang.Byte":
return Byte.TYPE;
}
return classType;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-proxy</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-proxy-jdk</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-proxy-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.proxy.jdk;
import io.binghe.rpc.proxy.api.BaseProxyFactory;
import io.binghe.rpc.proxy.api.ProxyFactory;
import java.lang.reflect.Proxy;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description JDK动态代理
*/
public class JdkProxyFactory<T> extends BaseProxyFactory<T> implements ProxyFactory {
@Override
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(
clazz.getClassLoader(),
new Class<?>[]{clazz},
objectProxy
);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-proxy</artifactId>
<packaging>pom</packaging>
<modules>
<module>bhrpc-proxy-api</module>
<module>bhrpc-proxy-jdk</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-serialization</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-serialization-api</artifactId>
<packaging>pom</packaging>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.serialization.api;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description 序列化接口
*/
public interface Serialization {
/**
* 序列化
*/
<T> byte[] serialize(T obj);
/**
* 反序列化
*/
<T> T deserialize(byte[] data, Class<T> cls);
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-serialization</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-serialization-jdk</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-serialization-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.serialization.jdk;
import io.binghe.rpc.common.exception.SerializerException;
import io.binghe.rpc.serialization.api.Serialization;
import java.io.*;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description Jdk Serialization
*/
public class JdkSerialization implements Serialization {
@Override
public <T> byte[] serialize(T obj) {
if (obj == null){
throw new SerializerException("serialize object is null");
}
try{
ByteArrayOutputStream os = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(os);
out.writeObject(obj);
return os.toByteArray();
}catch (IOException e){
throw new SerializerException(e.getMessage(), e);
}
}
@Override
public <T> T deserialize(byte[] data, Class<T> cls) {
if (data == null){
throw new SerializerException("deserialize data is null");
}
try{
ByteArrayInputStream is = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(is);
return (T) in.readObject();
}catch (Exception e){
throw new SerializerException(e.getMessage(), e);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-serialization</artifactId>
<packaging>pom</packaging>
<modules>
<module>bhrpc-serialization-api</module>
<module>bhrpc-serialization-jdk</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-test</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-test-api</artifactId>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.api;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description 测试服务接口
*/
public interface DemoService {
String hello(String name);
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-test-consumer</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-test-consumer-codec</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-codec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-test-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.consumer.codec;
import io.binghe.rpc.test.consumer.codec.init.RpcTestConsumerInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author binghe (公众号:冰河技术)
* @version 1.0.0
* @description 测试消费端
*/
public class RpcTestConsumer {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
try{
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new RpcTestConsumerInitializer());
bootstrap.connect("127.0.0.1", 27880).sync();
}catch (Exception e){
e.printStackTrace();
} finally {
Thread.sleep(2000);
eventLoopGroup.shutdownGracefully();
}
}
}
package io.binghe.rpc.test.consumer.codec.handler;
import com.alibaba.fastjson.JSONObject;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.header.RpcHeaderFactory;
import io.binghe.rpc.protocol.request.RpcRequest;
import io.binghe.rpc.protocol.response.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author binghe
* @version 1.0.0
* @description RPC消费者处理器
*/
public class RpcTestConsumerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {
private final Logger logger = LoggerFactory.getLogger(RpcTestConsumerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("发送数据开始...");
//模拟发送数据
RpcProtocol<RpcRequest> protocol = new RpcProtocol<RpcRequest>();
protocol.setHeader(RpcHeaderFactory.getRequestHeader("jdk"));
RpcRequest request = new RpcRequest();
request.setClassName("io.binghe.rpc.test.api.DemoService");
request.setGroup("binghe");
request.setMethodName("hello");
request.setParameters(new Object[]{"binghe"});
request.setParameterTypes(new Class[]{String.class});
request.setVersion("1.0.0");
request.setAsync(false);
request.setOneway(false);
protocol.setBody(request);
logger.info("服务消费者发送的数据===>>>{}", JSONObject.toJSONString(protocol));
ctx.writeAndFlush(protocol);
logger.info("发送数据完毕...");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcProtocol<RpcResponse> protocol) throws Exception {
logger.info("服务消费者接收到的数据===>>>{}", JSONObject.toJSONString(protocol));
}
}
\ No newline at end of file
package io.binghe.rpc.test.consumer.codec.init;
import io.binghe.rpc.codec.RpcDecoder;
import io.binghe.rpc.codec.RpcEncoder;
import io.binghe.rpc.test.consumer.codec.handler.RpcTestConsumerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
/**
* @author binghe
* @version 1.0.0
* @description
*/
public class RpcTestConsumerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline cp = socketChannel.pipeline();
cp.addLast(new RpcEncoder());
cp.addLast(new RpcDecoder());
cp.addLast(new RpcTestConsumerHandler());
}
}
\ No newline at end of file
log4j.rootLogger=Info,console,file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.File = d:/logs/log.log
log4j.appender.file.Append = true
log4j.appender.file.Threshold = Error
log4j.appender.file.layout = org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m%n
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-test-consumer</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-test-consumer-handler</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-consumer-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.consumer.handler;
import io.binghe.rpc.consumer.common.RpcConsumer;
import io.binghe.rpc.proxy.api.callback.AsyncRPCCallback;
import io.binghe.rpc.consumer.common.context.RpcContext;
import io.binghe.rpc.proxy.api.future.RPCFuture;
import io.binghe.rpc.protocol.RpcProtocol;
import io.binghe.rpc.protocol.header.RpcHeaderFactory;
import io.binghe.rpc.protocol.request.RpcRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 测试服务消费者
*/
public class RpcConsumerHandlerTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandlerTest.class);
public static void main(String[] args) throws Exception {
RpcConsumer consumer = RpcConsumer.getInstance();
RPCFuture rpcFuture = consumer.sendRequest(getRpcRequestProtocol());
rpcFuture.addCallback(new AsyncRPCCallback() {
@Override
public void onSuccess(Object result) {
LOGGER.info("从服务消费者获取到的数据===>>>" + result);
}
@Override
public void onException(Exception e) {
LOGGER.info("抛出了异常===>>>" + e);
}
});
Thread.sleep(200);
consumer.close();
}
public static void mainAsync(String[] args) throws Exception {
RpcConsumer consumer = RpcConsumer.getInstance();
consumer.sendRequest(getRpcRequestProtocol());
RPCFuture future = RpcContext.getContext().getRPCFuture();
LOGGER.info("从服务消费者获取到的数据===>>>" + future.get());
consumer.close();
}
public static void mainSync(String[] args) throws Exception {
RpcConsumer consumer = RpcConsumer.getInstance();
RPCFuture future = consumer.sendRequest(getRpcRequestProtocol());
LOGGER.info("从服务消费者获取到的数据===>>>" + future.get());
consumer.close();
}
private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){
//模拟发送数据
RpcProtocol<RpcRequest> protocol = new RpcProtocol<RpcRequest>();
protocol.setHeader(RpcHeaderFactory.getRequestHeader("jdk"));
RpcRequest request = new RpcRequest();
request.setClassName("io.binghe.rpc.test.api.DemoService");
request.setGroup("binghe");
request.setMethodName("hello");
request.setParameters(new Object[]{"binghe"});
request.setParameterTypes(new Class[]{String.class});
request.setVersion("1.0.0");
request.setAsync(false);
request.setOneway(false);
protocol.setBody(request);
return protocol;
}
private static RpcProtocol<RpcRequest> getRpcRequestProtocolAsync(){
//模拟发送数据
RpcProtocol<RpcRequest> protocol = new RpcProtocol<RpcRequest>();
protocol.setHeader(RpcHeaderFactory.getRequestHeader("jdk"));
RpcRequest request = new RpcRequest();
request.setClassName("io.binghe.rpc.test.api.DemoService");
request.setGroup("binghe");
request.setMethodName("hello");
request.setParameters(new Object[]{"binghe"});
request.setParameterTypes(new Class[]{String.class});
request.setVersion("1.0.0");
request.setAsync(true);
request.setOneway(false);
protocol.setBody(request);
return protocol;
}
private static RpcProtocol<RpcRequest> getRpcRequestProtocolSync(){
//模拟发送数据
RpcProtocol<RpcRequest> protocol = new RpcProtocol<RpcRequest>();
protocol.setHeader(RpcHeaderFactory.getRequestHeader("jdk"));
RpcRequest request = new RpcRequest();
request.setClassName("io.binghe.rpc.test.api.DemoService");
request.setGroup("binghe");
request.setMethodName("hello");
request.setParameters(new Object[]{"binghe"});
request.setParameterTypes(new Class[]{String.class});
request.setVersion("1.0.0");
request.setAsync(false);
request.setOneway(false);
protocol.setBody(request);
return protocol;
}
}
log4j.rootLogger=Info,console,file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.File = d:/logs/log.log
log4j.appender.file.Append = true
log4j.appender.file.Threshold = Error
log4j.appender.file.layout = org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m%n
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-test-consumer</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-test-consumer-native</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-consumer-native</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-test-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.test.consumer;
import io.binghe.rpc.consumer.RpcClient;
import io.binghe.rpc.proxy.api.async.IAsyncObjectProxy;
import io.binghe.rpc.proxy.api.future.RPCFuture;
import io.binghe.rpc.test.api.DemoService;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 测试Java原生启动服务消费者
*/
public class RpcConsumerNativeTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerNativeTest.class);
public static void main(String[] args){
RpcClient rpcClient = new RpcClient("1.0.0", "binghe", "jdk", 3000, false, false);
DemoService demoService = rpcClient.create(DemoService.class);
String result = demoService.hello("binghe");
LOGGER.info("返回的结果数据===>>> " + result);
rpcClient.shutdown();
}
@Test
public void testInterfaceRpc(){
RpcClient rpcClient = new RpcClient("1.0.0", "binghe", "jdk", 3000, false, false);
DemoService demoService = rpcClient.create(DemoService.class);
String result = demoService.hello("binghe");
LOGGER.info("返回的结果数据===>>> " + result);
rpcClient.shutdown();
}
@Test
public void testAsyncInterfaceRpc() throws Exception {
RpcClient rpcClient = new RpcClient("1.0.0", "binghe", "jdk", 3000, false, false);
IAsyncObjectProxy demoService = rpcClient.createAsync(DemoService.class);
RPCFuture future = demoService.call("hello", "binghe");
LOGGER.info("返回的结果数据===>>> " + future.get());
rpcClient.shutdown();
}
}
log4j.rootLogger=Info,console,file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.File = d:/logs/log.log
log4j.appender.file.Append = true
log4j.appender.file.Threshold = Error
log4j.appender.file.layout = org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m%n
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-test</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-test-consumer</artifactId>
<packaging>pom</packaging>
<modules>
<module>bhrpc-test-consumer-codec</module>
<module>bhrpc-test-consumer-handler</module>
<module>bhrpc-test-consumer-native</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-test</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-test-provider</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-provider-native</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-test-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.provider.service.impl;
import io.binghe.rpc.annotation.RpcService;
import io.binghe.rpc.test.api.DemoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author binghe
* @version 1.0.0
* @description DemoService实现类
*/
@RpcService(interfaceClass = DemoService.class, interfaceClassName = "io.binghe.rpc.test.api.DemoService", version = "1.0.0", group = "binghe")
public class ProviderDemoServiceImpl implements DemoService {
private final Logger logger = LoggerFactory.getLogger(ProviderDemoServiceImpl.class);
@Override
public String hello(String name) {
logger.info("调用hello方法传入的参数为===>>>{}", name);
return "hello " + name;
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.provider.single;
import io.binghe.rpc.provider.RpcSingleServer;
import org.junit.Test;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 测试Java原生启动RPC
*/
public class RpcSingleServerTest {
@Test
public void startRpcSingleServer(){
RpcSingleServer singleServer = new RpcSingleServer("127.0.0.1:27880", "io.binghe.rpc.test", "cglib");
singleServer.startNettyServer();
}
}
log4j.rootLogger=Info,console,file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.File = d:/logs/log.log
log4j.appender.file.Append = true
log4j.appender.file.Threshold = Error
log4j.appender.file.layout = org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m%n
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-test</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-test-scanner</artifactId>
<dependencies>
<dependency>
<groupId>io.binghe.rpc</groupId>
<artifactId>bhrpc-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.scanner;
import io.binghe.rpc.common.scanner.reference.RpcReferenceScanner;
import io.binghe.rpc.common.scanner.server.RpcServiceScanner;
import io.binghe.rpc.common.scanner.ClassScanner;
import org.junit.Test;
import java.util.List;
/**
* @author binghe
* @version 1.0.0
* @description 扫描测试
*/
public class ScannerTest {
/**
* 扫描io.binghe.rpc.test.scanner包下所有的类
*/
@Test
public void testScannerClassNameList() throws Exception {
List<String> classNameList = ClassScanner.getClassNameList("io.binghe.rpc.test.scanner");
classNameList.forEach(System.out::println);
}
/**
* 扫描io.binghe.rpc.test.scanner包下所有标注了@RpcService注解的类
*/
@Test
public void testScannerClassNameListByRpcService() throws Exception {
RpcServiceScanner.doScannerWithRpcServiceAnnotationFilterAndRegistryService("io.binghe.rpc.test.scanner");
}
/**
* 扫描io.binghe.rpc.test.scanner包下所有标注了@RpcReference注解的类
*/
@Test
public void testScannerClassNameListByRpcReference() throws Exception {
RpcReferenceScanner.doScannerWithRpcReferenceAnnotationFilter("io.binghe.rpc.test.scanner");
}
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.scanner.consumer.service;
/**
* @author binghe
* @version 1.0.0
* @description 服务消费者业务逻辑接口
*/
public interface ConsumerBusinessService {
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.scanner.consumer.service.impl;
import io.binghe.rpc.annotation.RpcReference;
import io.binghe.rpc.test.scanner.consumer.service.ConsumerBusinessService;
import io.binghe.rpc.test.scanner.service.DemoService;
/**
* @author binghe
* @version 1.0.0
* @description 服务消费者业务逻辑实现类
*/
public class ConsumerBusinessServiceImpl implements ConsumerBusinessService {
@RpcReference(registryType = "zookeeper", registryAddress = "127.0.0.1:2181", version = "1.0.0", group = "binghe")
private DemoService demoService;
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.scanner.provider;
import io.binghe.rpc.annotation.RpcService;
import io.binghe.rpc.test.scanner.service.DemoService;
/**
* @author binghe
* @version 1.0.0
* @description DemoService实现类
*/
@RpcService(interfaceClass = DemoService.class, interfaceClassName = "io.binghe.rpc.test.scanner.service.DemoService", version = "1.0.0", group = "binghe")
public class ProviderDemoServiceImpl implements DemoService {
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.rpc.test.scanner.service;
/**
* @author binghe
* @version 1.0.0
* @description 测试@RpcService注解的扫描
*/
public interface DemoService {
}
log4j.rootLogger=Info,console,file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.File = d:/logs/log.log
log4j.appender.file.Append = true
log4j.appender.file.Threshold = Error
log4j.appender.file.layout = org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m%n
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bhrpc-learning</artifactId>
<groupId>io.binghe.rpc</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bhrpc-test</artifactId>
<packaging>pom</packaging>
<modules>
<module>bhrpc-test-scanner</module>
<module>bhrpc-test-provider</module>
<module>bhrpc-test-consumer</module>
<module>bhrpc-test-api</module>
</modules>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册