Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
adae1624
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
adae1624
编写于
5月 27, 2017
作者:
J
Jaskey
提交者:
dongeforever
5月 27, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[ROCKETMQ-67] Consistent Hash allocate strategy closes apache/incubator-rocketmq#67
上级
c7961407
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
600 addition
and
0 deletion
+600
-0
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
...onsumer/rebalance/AllocateMessageQueueConsistentHash.java
+124
-0
client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
...umer/rebalance/AllocateMessageQueueConsitentHashTest.java
+243
-0
common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
.../rocketmq/common/consistenthash/ConsistentHashRouter.java
+140
-0
common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
...g/apache/rocketmq/common/consistenthash/HashFunction.java
+24
-0
common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
.../java/org/apache/rocketmq/common/consistenthash/Node.java
+28
-0
common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
...rg/apache/rocketmq/common/consistenthash/VirtualNode.java
+41
-0
未找到文件。
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
0 → 100644
浏览文件 @
adae1624
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.consumer.rebalance
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.List
;
import
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.common.consistenthash.ConsistentHashRouter
;
import
org.apache.rocketmq.common.consistenthash.HashFunction
;
import
org.apache.rocketmq.common.consistenthash.Node
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.slf4j.Logger
;
/**
* Consistent Hashing queue algorithm
*/
public
class
AllocateMessageQueueConsistentHash
implements
AllocateMessageQueueStrategy
{
private
final
Logger
log
=
ClientLogger
.
getLog
();
private
final
int
virtualNodeCnt
;
private
final
HashFunction
customHashFunction
;
public
AllocateMessageQueueConsistentHash
()
{
this
(
10
);
}
public
AllocateMessageQueueConsistentHash
(
int
virtualNodeCnt
)
{
this
(
virtualNodeCnt
,
null
);
}
public
AllocateMessageQueueConsistentHash
(
int
virtualNodeCnt
,
HashFunction
customHashFunction
)
{
if
(
virtualNodeCnt
<
0
)
{
throw
new
IllegalArgumentException
(
"illegal virtualNodeCnt :"
+
virtualNodeCnt
);
}
this
.
virtualNodeCnt
=
virtualNodeCnt
;
this
.
customHashFunction
=
customHashFunction
;
}
@Override
public
List
<
MessageQueue
>
allocate
(
String
consumerGroup
,
String
currentCID
,
List
<
MessageQueue
>
mqAll
,
List
<
String
>
cidAll
)
{
if
(
currentCID
==
null
||
currentCID
.
length
()
<
1
)
{
throw
new
IllegalArgumentException
(
"currentCID is empty"
);
}
if
(
mqAll
==
null
||
mqAll
.
isEmpty
())
{
throw
new
IllegalArgumentException
(
"mqAll is null or mqAll empty"
);
}
if
(
cidAll
==
null
||
cidAll
.
isEmpty
())
{
throw
new
IllegalArgumentException
(
"cidAll is null or cidAll empty"
);
}
List
<
MessageQueue
>
result
=
new
ArrayList
<
MessageQueue
>();
if
(!
cidAll
.
contains
(
currentCID
))
{
log
.
info
(
"[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}"
,
consumerGroup
,
currentCID
,
cidAll
);
return
result
;
}
Collection
<
ClientNode
>
cidNodes
=
new
ArrayList
<>();
for
(
String
cid
:
cidAll
)
{
cidNodes
.
add
(
new
ClientNode
(
cid
));
}
final
ConsistentHashRouter
<
ClientNode
>
router
;
//for building hash ring
if
(
customHashFunction
!=
null
)
{
router
=
new
ConsistentHashRouter
<>(
cidNodes
,
virtualNodeCnt
,
customHashFunction
);
}
else
{
router
=
new
ConsistentHashRouter
<>(
cidNodes
,
virtualNodeCnt
);
}
List
<
MessageQueue
>
results
=
new
ArrayList
<>();
for
(
MessageQueue
mq
:
mqAll
)
{
ClientNode
clientNode
=
router
.
routeNode
(
mq
.
toString
());
if
(
clientNode
!=
null
&&
currentCID
.
equals
(
clientNode
.
getKey
()))
{
results
.
add
(
mq
);
}
}
return
results
;
}
@Override
public
String
getName
()
{
return
"CONSISTENT_HASH"
;
}
private
static
class
ClientNode
implements
Node
{
private
final
String
clientID
;
public
ClientNode
(
String
clientID
)
{
this
.
clientID
=
clientID
;
}
@Override
public
String
getKey
()
{
return
clientID
;
}
}
}
client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
0 → 100644
浏览文件 @
adae1624
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.consumer.rebalance
;
import
java.util.ArrayList
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Random
;
import
java.util.TreeMap
;
import
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
public
class
AllocateMessageQueueConsitentHashTest
{
private
String
topic
;
private
static
final
String
CID_PREFIX
=
"CID-"
;
@Before
public
void
init
()
{
topic
=
"topic_test"
;
}
public
void
printMessageQueue
(
List
<
MessageQueue
>
messageQueueList
,
String
name
)
{
if
(
messageQueueList
==
null
||
messageQueueList
.
size
()
<
1
)
return
;
System
.
out
.
println
(
name
+
".......................................start"
);
for
(
MessageQueue
messageQueue
:
messageQueueList
)
{
System
.
out
.
println
(
messageQueue
);
}
System
.
out
.
println
(
name
+
".......................................end"
);
}
@Test
public
void
testCurrentCIDNotExists
()
{
String
currentCID
=
String
.
valueOf
(
Integer
.
MAX_VALUE
);
List
<
String
>
consumerIdList
=
createConsumerIdList
(
2
);
List
<
MessageQueue
>
messageQueueList
=
createMessageQueueList
(
6
);
List
<
MessageQueue
>
result
=
new
AllocateMessageQueueConsistentHash
().
allocate
(
""
,
currentCID
,
messageQueueList
,
consumerIdList
);
printMessageQueue
(
result
,
"testCurrentCIDNotExists"
);
Assert
.
assertEquals
(
result
.
size
(),
0
);
}
@Test
(
expected
=
IllegalArgumentException
.
class
)
public
void
testCurrentCIDIllegalArgument
()
{
List
<
String
>
consumerIdList
=
createConsumerIdList
(
2
);
List
<
MessageQueue
>
messageQueueList
=
createMessageQueueList
(
6
);
new
AllocateMessageQueueConsistentHash
().
allocate
(
""
,
""
,
messageQueueList
,
consumerIdList
);
}
@Test
(
expected
=
IllegalArgumentException
.
class
)
public
void
testMessageQueueIllegalArgument
()
{
String
currentCID
=
"0"
;
List
<
String
>
consumerIdList
=
createConsumerIdList
(
2
);
new
AllocateMessageQueueConsistentHash
().
allocate
(
""
,
currentCID
,
null
,
consumerIdList
);
}
@Test
(
expected
=
IllegalArgumentException
.
class
)
public
void
testConsumerIdIllegalArgument
()
{
String
currentCID
=
"0"
;
List
<
MessageQueue
>
messageQueueList
=
createMessageQueueList
(
6
);
new
AllocateMessageQueueConsistentHash
().
allocate
(
""
,
currentCID
,
messageQueueList
,
null
);
}
@Test
public
void
testAllocate1
()
{
testAllocate
(
20
,
10
);
}
@Test
public
void
testAllocate2
()
{
testAllocate
(
10
,
20
);
}
@Test
public
void
testRun100RandomCase
(){
for
(
int
i
=
0
;
i
<
100
;
i
++){
int
consumerSize
=
new
Random
().
nextInt
(
200
)+
1
;
//1-200
int
queueSize
=
new
Random
().
nextInt
(
100
)+
1
;
//1-100
testAllocate
(
queueSize
,
consumerSize
);
try
{
Thread
.
sleep
(
1
);
}
catch
(
InterruptedException
e
)
{}
}
}
public
void
testAllocate
(
int
queueSize
,
int
consumerSize
)
{
AllocateMessageQueueStrategy
allocateMessageQueueConsistentHash
=
new
AllocateMessageQueueConsistentHash
(
3
);
List
<
MessageQueue
>
mqAll
=
createMessageQueueList
(
queueSize
);
//System.out.println("mqAll:" + mqAll.toString());
List
<
String
>
cidAll
=
createConsumerIdList
(
consumerSize
);
List
<
MessageQueue
>
allocatedResAll
=
new
ArrayList
<>();
Map
<
MessageQueue
,
String
>
allocateToAllOrigin
=
new
TreeMap
<>();
//test allocate all
{
List
<
String
>
cidBegin
=
new
ArrayList
<>(
cidAll
);
//System.out.println("cidAll:" + cidBegin.toString());
for
(
String
cid
:
cidBegin
)
{
List
<
MessageQueue
>
rs
=
allocateMessageQueueConsistentHash
.
allocate
(
"testConsumerGroup"
,
cid
,
mqAll
,
cidBegin
);
for
(
MessageQueue
mq
:
rs
)
{
allocateToAllOrigin
.
put
(
mq
,
cid
);
}
allocatedResAll
.
addAll
(
rs
);
//System.out.println("rs[" + cid + "]:" + rs.toString());
}
Assert
.
assertTrue
(
verifyAllocateAll
(
cidBegin
,
mqAll
,
allocatedResAll
));
}
Map
<
MessageQueue
,
String
>
allocateToAllAfterRemoveOne
=
new
TreeMap
<>();
List
<
String
>
cidAfterRemoveOne
=
new
ArrayList
<>(
cidAll
);
//test allocate remove one cid
{
String
removeCID
=
cidAfterRemoveOne
.
remove
(
0
);
//System.out.println("removing one cid "+removeCID);
List
<
MessageQueue
>
mqShouldOnlyChanged
=
new
ArrayList
<>();
Iterator
<
Map
.
Entry
<
MessageQueue
,
String
>>
it
=
allocateToAllOrigin
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
Map
.
Entry
<
MessageQueue
,
String
>
entry
=
it
.
next
();
if
(
entry
.
getValue
().
equals
(
removeCID
))
{
mqShouldOnlyChanged
.
add
(
entry
.
getKey
());
}
}
//System.out.println("cidAll:" + cidAfterRemoveOne.toString());
List
<
MessageQueue
>
allocatedResAllAfterRemove
=
new
ArrayList
<>();
for
(
String
cid
:
cidAfterRemoveOne
)
{
List
<
MessageQueue
>
rs
=
allocateMessageQueueConsistentHash
.
allocate
(
"testConsumerGroup"
,
cid
,
mqAll
,
cidAfterRemoveOne
);
allocatedResAllAfterRemove
.
addAll
(
rs
);
for
(
MessageQueue
mq
:
rs
)
{
allocateToAllAfterRemoveOne
.
put
(
mq
,
cid
);
}
//System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
}
Assert
.
assertTrue
(
"queueSize"
+
queueSize
+
"consumerSize:"
+
consumerSize
+
"\nmqAll:"
+
mqAll
+
"\nallocatedResAllAfterRemove"
+
allocatedResAllAfterRemove
,
verifyAllocateAll
(
cidAfterRemoveOne
,
mqAll
,
allocatedResAllAfterRemove
));
verifyAfterRemove
(
allocateToAllOrigin
,
allocateToAllAfterRemoveOne
,
removeCID
);
}
List
<
String
>
cidAfterAdd
=
new
ArrayList
<>(
cidAfterRemoveOne
);
//test allocate add one more cid
{
String
newCid
=
CID_PREFIX
+
"NEW"
;
//System.out.println("add one more cid "+newCid);
cidAfterAdd
.
add
(
newCid
);
List
<
MessageQueue
>
mqShouldOnlyChanged
=
new
ArrayList
<>();
//System.out.println("cidAll:" + cidAfterAdd.toString());
List
<
MessageQueue
>
allocatedResAllAfterAdd
=
new
ArrayList
<>();
Map
<
MessageQueue
,
String
>
allocateToAll3
=
new
TreeMap
<>();
for
(
String
cid
:
cidAfterAdd
)
{
List
<
MessageQueue
>
rs
=
allocateMessageQueueConsistentHash
.
allocate
(
"testConsumerGroup"
,
cid
,
mqAll
,
cidAfterAdd
);
allocatedResAllAfterAdd
.
addAll
(
rs
);
for
(
MessageQueue
mq
:
rs
)
{
allocateToAll3
.
put
(
mq
,
cid
);
if
(
cid
.
equals
(
newCid
)){
mqShouldOnlyChanged
.
add
(
mq
);
}
}
//System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
}
Assert
.
assertTrue
(
verifyAllocateAll
(
cidAfterAdd
,
mqAll
,
allocatedResAllAfterAdd
));
verifyAfterAdd
(
allocateToAllAfterRemoveOne
,
allocateToAll3
,
newCid
);
}
}
private
boolean
verifyAllocateAll
(
List
<
String
>
cidAll
,
List
<
MessageQueue
>
mqAll
,
List
<
MessageQueue
>
allocatedResAll
)
{
if
(
cidAll
.
isEmpty
()){
return
allocatedResAll
.
isEmpty
();
}
return
mqAll
.
containsAll
(
allocatedResAll
)
&&
allocatedResAll
.
containsAll
(
mqAll
);
}
private
void
verifyAfterRemove
(
Map
<
MessageQueue
,
String
>
allocateToBefore
,
Map
<
MessageQueue
,
String
>
allocateAfter
,
String
removeCID
)
{
for
(
MessageQueue
mq
:
allocateToBefore
.
keySet
())
{
String
allocateToOrigin
=
allocateToBefore
.
get
(
mq
);
if
(
allocateToOrigin
.
equals
(
removeCID
))
{
}
else
{
//the rest queue should be the same
Assert
.
assertTrue
(
allocateAfter
.
get
(
mq
).
equals
(
allocateToOrigin
));
//should be the same
}
}
}
private
void
verifyAfterAdd
(
Map
<
MessageQueue
,
String
>
allocateBefore
,
Map
<
MessageQueue
,
String
>
allocateAfter
,
String
newCID
)
{
for
(
MessageQueue
mq
:
allocateAfter
.
keySet
())
{
String
allocateToOrigin
=
allocateBefore
.
get
(
mq
);
String
allocateToAfter
=
allocateAfter
.
get
(
mq
);
if
(
allocateToAfter
.
equals
(
newCID
))
{
}
else
{
//the rest queue should be the same
Assert
.
assertTrue
(
"it was allocated to "
+
allocateToOrigin
+
". Now, it is to "
+
allocateAfter
.
get
(
mq
)+
" mq:"
+
mq
,
allocateAfter
.
get
(
mq
).
equals
(
allocateToOrigin
));
//should be the same
}
}
}
private
List
<
String
>
createConsumerIdList
(
int
size
)
{
List
<
String
>
consumerIdList
=
new
ArrayList
<>(
size
);
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
consumerIdList
.
add
(
CID_PREFIX
+
String
.
valueOf
(
i
));
}
return
consumerIdList
;
}
private
List
<
MessageQueue
>
createMessageQueueList
(
int
size
)
{
List
<
MessageQueue
>
messageQueueList
=
new
ArrayList
<>(
size
);
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
MessageQueue
mq
=
new
MessageQueue
(
topic
,
"brokerName"
,
i
);
messageQueueList
.
add
(
mq
);
}
return
messageQueueList
;
}
}
common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
0 → 100644
浏览文件 @
adae1624
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.consistenthash
;
import
java.security.MessageDigest
;
import
java.security.NoSuchAlgorithmException
;
import
java.util.Collection
;
import
java.util.Iterator
;
import
java.util.SortedMap
;
import
java.util.TreeMap
;
/**
* To hash Node objects to a hash ring with a certain amount of virtual node.
* Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm
*
* @param <T>
*/
public
class
ConsistentHashRouter
<
T
extends
Node
>
{
private
final
SortedMap
<
Long
,
VirtualNode
<
T
>>
ring
=
new
TreeMap
<>();
private
final
HashFunction
hashFunction
;
public
ConsistentHashRouter
(
Collection
<
T
>
pNodes
,
int
vNodeCount
)
{
this
(
pNodes
,
vNodeCount
,
new
MD5Hash
());
}
/**
*
* @param pNodes collections of physical nodes
* @param vNodeCount amounts of virtual nodes
* @param hashFunction hash Function to hash Node instances
*/
public
ConsistentHashRouter
(
Collection
<
T
>
pNodes
,
int
vNodeCount
,
HashFunction
hashFunction
)
{
if
(
hashFunction
==
null
)
{
throw
new
NullPointerException
(
"Hash Function is null"
);
}
this
.
hashFunction
=
hashFunction
;
if
(
pNodes
!=
null
)
{
for
(
T
pNode
:
pNodes
)
{
addNode
(
pNode
,
vNodeCount
);
}
}
}
/**
* add physic node to the hash ring with some virtual nodes
* @param pNode physical node needs added to hash ring
* @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
*/
public
void
addNode
(
T
pNode
,
int
vNodeCount
)
{
if
(
vNodeCount
<
0
)
throw
new
IllegalArgumentException
(
"illegal virtual node counts :"
+
vNodeCount
);
int
existingReplicas
=
getExistingReplicas
(
pNode
);
for
(
int
i
=
0
;
i
<
vNodeCount
;
i
++)
{
VirtualNode
<
T
>
vNode
=
new
VirtualNode
<>(
pNode
,
i
+
existingReplicas
);
ring
.
put
(
hashFunction
.
hash
(
vNode
.
getKey
()),
vNode
);
}
}
/**
* remove the physical node from the hash ring
* @param pNode
*/
public
void
removeNode
(
T
pNode
)
{
Iterator
<
Long
>
it
=
ring
.
keySet
().
iterator
();
while
(
it
.
hasNext
())
{
Long
key
=
it
.
next
();
VirtualNode
<
T
>
virtualNode
=
ring
.
get
(
key
);
if
(
virtualNode
.
isVirtualNodeOf
(
pNode
))
{
it
.
remove
();
}
}
}
/**
* with a specified key, route the nearest Node instance in the current hash ring
* @param objectKey the object key to find a nearest Node
* @return
*/
public
T
routeNode
(
String
objectKey
)
{
if
(
ring
.
isEmpty
())
{
return
null
;
}
Long
hashVal
=
hashFunction
.
hash
(
objectKey
);
SortedMap
<
Long
,
VirtualNode
<
T
>>
tailMap
=
ring
.
tailMap
(
hashVal
);
Long
nodeHashVal
=
!
tailMap
.
isEmpty
()
?
tailMap
.
firstKey
()
:
ring
.
firstKey
();
return
ring
.
get
(
nodeHashVal
).
getPhysicalNode
();
}
public
int
getExistingReplicas
(
T
pNode
)
{
int
replicas
=
0
;
for
(
VirtualNode
<
T
>
vNode
:
ring
.
values
())
{
if
(
vNode
.
isVirtualNodeOf
(
pNode
))
{
replicas
++;
}
}
return
replicas
;
}
//default hash function
private
static
class
MD5Hash
implements
HashFunction
{
MessageDigest
instance
;
public
MD5Hash
()
{
try
{
instance
=
MessageDigest
.
getInstance
(
"MD5"
);
}
catch
(
NoSuchAlgorithmException
e
)
{
}
}
@Override
public
long
hash
(
String
key
)
{
instance
.
reset
();
instance
.
update
(
key
.
getBytes
());
byte
[]
digest
=
instance
.
digest
();
long
h
=
0
;
for
(
int
i
=
0
;
i
<
4
;
i
++)
{
h
<<=
8
;
h
|=
((
int
)
digest
[
i
])
&
0xFF
;
}
return
h
;
}
}
}
common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
0 → 100644
浏览文件 @
adae1624
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.consistenthash
;
/**
* Hash String to long value
*/
public
interface
HashFunction
{
long
hash
(
String
key
);
}
common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
0 → 100644
浏览文件 @
adae1624
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.consistenthash
;
/**
* Represent a node which should be mapped to a hash ring
*/
public
interface
Node
{
/**
*
* @return the key which will be used for hash mapping
*/
String
getKey
();
}
common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
0 → 100644
浏览文件 @
adae1624
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.consistenthash
;
public
class
VirtualNode
<
T
extends
Node
>
implements
Node
{
final
T
physicalNode
;
final
int
replicaIndex
;
public
VirtualNode
(
T
physicalNode
,
int
replicaIndex
)
{
this
.
replicaIndex
=
replicaIndex
;
this
.
physicalNode
=
physicalNode
;
}
@Override
public
String
getKey
()
{
return
physicalNode
.
getKey
()
+
"-"
+
replicaIndex
;
}
public
boolean
isVirtualNodeOf
(
T
pNode
)
{
return
physicalNode
.
getKey
().
equals
(
pNode
.
getKey
());
}
public
T
getPhysicalNode
()
{
return
physicalNode
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录