Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
西红柿炒蛋不加盐
guide-rpc-framework
提交
336316ce
G
guide-rpc-framework
项目概览
西红柿炒蛋不加盐
/
guide-rpc-framework
与 Fork 源项目一致
Fork自
嗝屁小孩纸 / guide-rpc-framework
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
G
guide-rpc-framework
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
336316ce
编写于
4月 28, 2021
作者:
G
Guide哥
提交者:
GitHub
4月 28, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #41 from mesmerizeBy/master
Fix consistency hash bug
上级
91cd884e
c4e912c6
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
61 addition
and
19 deletion
+61
-19
rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java
...ava/github/javaguide/loadbalance/AbstractLoadBalance.java
+5
-3
rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java
...c/main/java/github/javaguide/loadbalance/LoadBalance.java
+2
-1
rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java
...e/loadbalance/loadbalancer/ConsistentHashLoadBalance.java
+8
-4
rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java
...javaguide/loadbalance/loadbalancer/RandomLoadBalance.java
+2
-1
rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java
...main/java/github/javaguide/registry/ServiceDiscovery.java
+3
-2
rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java
...java/github/javaguide/registry/zk/ZkServiceDiscovery.java
+4
-2
rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java
...guide/remoting/transport/netty/client/NettyRpcClient.java
+2
-2
rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java
.../javaguide/remoting/transport/socket/SocketRpcClient.java
+1
-1
rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java
...adbalance/loadbalancer/ConsistentHashLoadBalanceTest.java
+23
-2
rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java
...java/github/javaguide/registry/ZkServiceRegistryTest.java
+11
-1
未找到文件。
rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java
浏览文件 @
336316ce
package
github.javaguide.loadbalance
;
import
github.javaguide.remoting.dto.RpcRequest
;
import
java.util.List
;
/**
...
...
@@ -10,16 +12,16 @@ import java.util.List;
*/
public
abstract
class
AbstractLoadBalance
implements
LoadBalance
{
@Override
public
String
selectServiceAddress
(
List
<
String
>
serviceAddresses
,
String
rpcServiceName
)
{
public
String
selectServiceAddress
(
List
<
String
>
serviceAddresses
,
RpcRequest
rpcRequest
)
{
if
(
serviceAddresses
==
null
||
serviceAddresses
.
size
()
==
0
)
{
return
null
;
}
if
(
serviceAddresses
.
size
()
==
1
)
{
return
serviceAddresses
.
get
(
0
);
}
return
doSelect
(
serviceAddresses
,
rpc
ServiceName
);
return
doSelect
(
serviceAddresses
,
rpc
Request
);
}
protected
abstract
String
doSelect
(
List
<
String
>
serviceAddresses
,
String
rpcServiceName
);
protected
abstract
String
doSelect
(
List
<
String
>
serviceAddresses
,
RpcRequest
rpcRequest
);
}
rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java
浏览文件 @
336316ce
package
github.javaguide.loadbalance
;
import
github.javaguide.extension.SPI
;
import
github.javaguide.remoting.dto.RpcRequest
;
import
java.util.List
;
...
...
@@ -18,5 +19,5 @@ public interface LoadBalance {
* @param serviceAddresses Service address list
* @return target service address
*/
String
selectServiceAddress
(
List
<
String
>
serviceAddresses
,
String
rpcServiceName
);
String
selectServiceAddress
(
List
<
String
>
serviceAddresses
,
RpcRequest
rpcRequest
);
}
rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java
浏览文件 @
336316ce
package
github.javaguide.loadbalance.loadbalancer
;
import
github.javaguide.loadbalance.AbstractLoadBalance
;
import
github.javaguide.remoting.dto.RpcRequest
;
import
lombok.extern.slf4j.Slf4j
;
import
java.nio.charset.StandardCharsets
;
import
java.security.MessageDigest
;
import
java.security.NoSuchAlgorithmException
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.TreeMap
;
...
...
@@ -22,8 +24,10 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private
final
ConcurrentHashMap
<
String
,
ConsistentHashSelector
>
selectors
=
new
ConcurrentHashMap
<>();
@Override
protected
String
doSelect
(
List
<
String
>
serviceAddresses
,
String
rpcServiceName
)
{
protected
String
doSelect
(
List
<
String
>
serviceAddresses
,
RpcRequest
rpcRequest
)
{
int
identityHashCode
=
System
.
identityHashCode
(
serviceAddresses
);
// build rpc service name by rpcRequest
String
rpcServiceName
=
rpcRequest
.
toRpcProperties
().
toRpcServiceName
();
ConsistentHashSelector
selector
=
selectors
.
get
(
rpcServiceName
);
...
...
@@ -33,7 +37,7 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
selector
=
selectors
.
get
(
rpcServiceName
);
}
return
selector
.
select
(
rpcServiceName
);
return
selector
.
select
(
rpcServiceName
+
Arrays
.
stream
(
rpcRequest
.
getParameters
())
);
}
static
class
ConsistentHashSelector
{
...
...
@@ -73,8 +77,8 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
return
((
long
)
(
digest
[
3
+
idx
*
4
]
&
255
)
<<
24
|
(
long
)
(
digest
[
2
+
idx
*
4
]
&
255
)
<<
16
|
(
long
)
(
digest
[
1
+
idx
*
4
]
&
255
)
<<
8
|
(
long
)
(
digest
[
idx
*
4
]
&
255
))
&
4294967295L
;
}
public
String
select
(
String
rpcService
Name
)
{
byte
[]
digest
=
md5
(
rpcService
Name
);
public
String
select
(
String
rpcService
Key
)
{
byte
[]
digest
=
md5
(
rpcService
Key
);
return
selectForKey
(
hash
(
digest
,
0
));
}
...
...
rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java
浏览文件 @
336316ce
package
github.javaguide.loadbalance.loadbalancer
;
import
github.javaguide.loadbalance.AbstractLoadBalance
;
import
github.javaguide.remoting.dto.RpcRequest
;
import
java.util.List
;
import
java.util.Random
;
...
...
@@ -13,7 +14,7 @@ import java.util.Random;
*/
public
class
RandomLoadBalance
extends
AbstractLoadBalance
{
@Override
protected
String
doSelect
(
List
<
String
>
serviceAddresses
,
String
rpcServiceName
)
{
protected
String
doSelect
(
List
<
String
>
serviceAddresses
,
RpcRequest
rpcRequest
)
{
Random
random
=
new
Random
();
return
serviceAddresses
.
get
(
random
.
nextInt
(
serviceAddresses
.
size
()));
}
...
...
rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java
浏览文件 @
336316ce
package
github.javaguide.registry
;
import
github.javaguide.extension.SPI
;
import
github.javaguide.remoting.dto.RpcRequest
;
import
java.net.InetSocketAddress
;
...
...
@@ -15,8 +16,8 @@ public interface ServiceDiscovery {
/**
* lookup service by rpcServiceName
*
* @param rpc
ServiceName rpc service name
* @param rpc
Request rpc service pojo
* @return service address
*/
InetSocketAddress
lookupService
(
String
rpcServiceName
);
InetSocketAddress
lookupService
(
RpcRequest
rpcRequest
);
}
rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java
浏览文件 @
336316ce
...
...
@@ -6,6 +6,7 @@ import github.javaguide.extension.ExtensionLoader;
import
github.javaguide.loadbalance.LoadBalance
;
import
github.javaguide.registry.ServiceDiscovery
;
import
github.javaguide.registry.zk.util.CuratorUtils
;
import
github.javaguide.remoting.dto.RpcRequest
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.curator.framework.CuratorFramework
;
...
...
@@ -27,14 +28,15 @@ public class ZkServiceDiscovery implements ServiceDiscovery {
}
@Override
public
InetSocketAddress
lookupService
(
String
rpcServiceName
)
{
public
InetSocketAddress
lookupService
(
RpcRequest
rpcRequest
)
{
String
rpcServiceName
=
rpcRequest
.
toRpcProperties
().
toRpcServiceName
();
CuratorFramework
zkClient
=
CuratorUtils
.
getZkClient
();
List
<
String
>
serviceUrlList
=
CuratorUtils
.
getChildrenNodes
(
zkClient
,
rpcServiceName
);
if
(
serviceUrlList
==
null
||
serviceUrlList
.
size
()
==
0
)
{
throw
new
RpcException
(
RpcErrorMessageEnum
.
SERVICE_CAN_NOT_BE_FOUND
,
rpcServiceName
);
}
// load balancing
String
targetServiceUrl
=
loadBalance
.
selectServiceAddress
(
serviceUrlList
,
rpc
ServiceName
);
String
targetServiceUrl
=
loadBalance
.
selectServiceAddress
(
serviceUrlList
,
rpc
Request
);
log
.
info
(
"Successfully found the service address:[{}]"
,
targetServiceUrl
);
String
[]
socketAddressArray
=
targetServiceUrl
.
split
(
":"
);
String
host
=
socketAddressArray
[
0
];
...
...
rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java
浏览文件 @
336316ce
...
...
@@ -98,9 +98,9 @@ public final class NettyRpcClient implements RpcRequestTransport {
// build return value
CompletableFuture
<
RpcResponse
<
Object
>>
resultFuture
=
new
CompletableFuture
<>();
// build rpc service name by rpcRequest
String
rpcServiceName
=
rpcRequest
.
toRpcProperties
().
toRpcServiceName
();
//
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
// get server address
InetSocketAddress
inetSocketAddress
=
serviceDiscovery
.
lookupService
(
rpc
ServiceName
);
InetSocketAddress
inetSocketAddress
=
serviceDiscovery
.
lookupService
(
rpc
Request
);
// get server address related channel
Channel
channel
=
getChannel
(
inetSocketAddress
);
if
(
channel
.
isActive
())
{
...
...
rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java
浏览文件 @
336316ce
...
...
@@ -35,7 +35,7 @@ public class SocketRpcClient implements RpcRequestTransport {
// build rpc service name by rpcRequest
String
rpcServiceName
=
RpcServiceProperties
.
builder
().
serviceName
(
rpcRequest
.
getInterfaceName
())
.
group
(
rpcRequest
.
getGroup
()).
version
(
rpcRequest
.
getVersion
()).
build
().
toRpcServiceName
();
InetSocketAddress
inetSocketAddress
=
serviceDiscovery
.
lookupService
(
rpc
ServiceName
);
InetSocketAddress
inetSocketAddress
=
serviceDiscovery
.
lookupService
(
rpc
Request
);
try
(
Socket
socket
=
new
Socket
())
{
socket
.
connect
(
inetSocketAddress
);
ObjectOutputStream
objectOutputStream
=
new
ObjectOutputStream
(
socket
.
getOutputStream
());
...
...
rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java
浏览文件 @
336316ce
...
...
@@ -2,11 +2,13 @@ package github.javaguide.loadbalance.loadbalancer;
import
github.javaguide.extension.ExtensionLoader
;
import
github.javaguide.loadbalance.LoadBalance
;
import
github.javaguide.remoting.dto.RpcRequest
;
import
org.junit.jupiter.api.Test
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.UUID
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
...
...
@@ -17,10 +19,29 @@ class ConsistentHashLoadBalanceTest {
LoadBalance
loadBalance
=
ExtensionLoader
.
getExtensionLoader
(
LoadBalance
.
class
).
getExtension
(
"loadBalance"
);
List
<
String
>
serviceUrlList
=
new
ArrayList
<>(
Arrays
.
asList
(
"127.0.0.1:9997"
,
"127.0.0.1:9998"
,
"127.0.0.1:9999"
));
String
userRpcServiceName
=
"github.javaguide.UserServicetest1version1"
;
String
userServiceAddress
=
loadBalance
.
selectServiceAddress
(
serviceUrlList
,
userRpcServiceName
);
//build rpcCall
RpcRequest
rpcRequest
=
RpcRequest
.
builder
()
// .parameters(args)
.
interfaceName
(
userRpcServiceName
)
// .paramTypes(method.getParameterTypes())
.
requestId
(
UUID
.
randomUUID
().
toString
())
.
group
(
"test2"
)
.
version
(
"version2"
)
.
build
();
String
userServiceAddress
=
loadBalance
.
selectServiceAddress
(
serviceUrlList
,
rpcRequest
);
assertEquals
(
"127.0.0.1:9999"
,
userServiceAddress
);
String
schoolRpcServiceName
=
"github.javaguide.SchoolServicetest1version1"
;
String
schoolServiceAddress
=
loadBalance
.
selectServiceAddress
(
serviceUrlList
,
schoolRpcServiceName
);
rpcRequest
=
RpcRequest
.
builder
()
// .parameters(args)
.
interfaceName
(
userRpcServiceName
)
// .paramTypes(method.getParameterTypes())
.
requestId
(
UUID
.
randomUUID
().
toString
())
.
group
(
"test2"
)
.
version
(
"version2"
)
.
build
();
String
schoolServiceAddress
=
loadBalance
.
selectServiceAddress
(
serviceUrlList
,
rpcRequest
);
assertEquals
(
"127.0.0.1:9997"
,
schoolServiceAddress
);
}
}
\ No newline at end of file
rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java
浏览文件 @
336316ce
...
...
@@ -2,9 +2,11 @@ package github.javaguide.registry;
import
github.javaguide.registry.zk.ZkServiceDiscovery
;
import
github.javaguide.registry.zk.ZkServiceRegistry
;
import
github.javaguide.remoting.dto.RpcRequest
;
import
org.junit.jupiter.api.Test
;
import
java.net.InetSocketAddress
;
import
java.util.UUID
;
import
static
org
.
junit
.
jupiter
.
api
.
Assertions
.
assertEquals
;
...
...
@@ -21,7 +23,15 @@ class ZkServiceRegistryTest {
InetSocketAddress
givenInetSocketAddress
=
new
InetSocketAddress
(
"127.0.0.1"
,
9333
);
zkServiceRegistry
.
registerService
(
"github.javaguide.registry.zk.ZkServiceRegistry"
,
givenInetSocketAddress
);
ServiceDiscovery
zkServiceDiscovery
=
new
ZkServiceDiscovery
();
InetSocketAddress
acquiredInetSocketAddress
=
zkServiceDiscovery
.
lookupService
(
"github.javaguide.registry.zk.ZkServiceRegistry"
);
RpcRequest
rpcRequest
=
RpcRequest
.
builder
()
// .parameters(args)
.
interfaceName
(
"github.javaguide.registry.zk.ZkServiceRegistry"
)
// .paramTypes(method.getParameterTypes())
.
requestId
(
UUID
.
randomUUID
().
toString
())
.
group
(
"test2"
)
.
version
(
"version2"
)
.
build
();
InetSocketAddress
acquiredInetSocketAddress
=
zkServiceDiscovery
.
lookupService
(
rpcRequest
);
assertEquals
(
givenInetSocketAddress
.
toString
(),
acquiredInetSocketAddress
.
toString
());
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录