Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
8d781757
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
267
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
8d781757
编写于
5月 27, 2017
作者:
D
dongeforever
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Remove diamond operator for client module with JDK 1.6
上级
e57f9ac4
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
22 addition
and
21 deletion
+22
-21
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
...onsumer/rebalance/AllocateMessageQueueConsistentHash.java
+4
-4
client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
...umer/rebalance/AllocateMessageQueueConsitentHashTest.java
+14
-13
common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
.../rocketmq/common/consistenthash/ConsistentHashRouter.java
+2
-2
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+2
-2
未找到文件。
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
浏览文件 @
8d781757
...
@@ -76,19 +76,19 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue
...
@@ -76,19 +76,19 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue
}
}
Collection
<
ClientNode
>
cidNodes
=
new
ArrayList
<>();
Collection
<
ClientNode
>
cidNodes
=
new
ArrayList
<
ClientNode
>();
for
(
String
cid
:
cidAll
)
{
for
(
String
cid
:
cidAll
)
{
cidNodes
.
add
(
new
ClientNode
(
cid
));
cidNodes
.
add
(
new
ClientNode
(
cid
));
}
}
final
ConsistentHashRouter
<
ClientNode
>
router
;
//for building hash ring
final
ConsistentHashRouter
<
ClientNode
>
router
;
//for building hash ring
if
(
customHashFunction
!=
null
)
{
if
(
customHashFunction
!=
null
)
{
router
=
new
ConsistentHashRouter
<>(
cidNodes
,
virtualNodeCnt
,
customHashFunction
);
router
=
new
ConsistentHashRouter
<
ClientNode
>(
cidNodes
,
virtualNodeCnt
,
customHashFunction
);
}
else
{
}
else
{
router
=
new
ConsistentHashRouter
<>(
cidNodes
,
virtualNodeCnt
);
router
=
new
ConsistentHashRouter
<
ClientNode
>(
cidNodes
,
virtualNodeCnt
);
}
}
List
<
MessageQueue
>
results
=
new
ArrayList
<>();
List
<
MessageQueue
>
results
=
new
ArrayList
<
MessageQueue
>();
for
(
MessageQueue
mq
:
mqAll
)
{
for
(
MessageQueue
mq
:
mqAll
)
{
ClientNode
clientNode
=
router
.
routeNode
(
mq
.
toString
());
ClientNode
clientNode
=
router
.
routeNode
(
mq
.
toString
());
if
(
clientNode
!=
null
&&
currentCID
.
equals
(
clientNode
.
getKey
()))
{
if
(
clientNode
!=
null
&&
currentCID
.
equals
(
clientNode
.
getKey
()))
{
...
...
client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
浏览文件 @
8d781757
...
@@ -23,6 +23,7 @@ import java.util.Map;
...
@@ -23,6 +23,7 @@ import java.util.Map;
import
java.util.Random
;
import
java.util.Random
;
import
java.util.TreeMap
;
import
java.util.TreeMap
;
import
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy
;
import
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.junit.Assert
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Before
;
...
@@ -113,13 +114,13 @@ public class AllocateMessageQueueConsitentHashTest {
...
@@ -113,13 +114,13 @@ public class AllocateMessageQueueConsitentHashTest {
//System.out.println("mqAll:" + mqAll.toString());
//System.out.println("mqAll:" + mqAll.toString());
List
<
String
>
cidAll
=
createConsumerIdList
(
consumerSize
);
List
<
String
>
cidAll
=
createConsumerIdList
(
consumerSize
);
List
<
MessageQueue
>
allocatedResAll
=
new
ArrayList
<>();
List
<
MessageQueue
>
allocatedResAll
=
new
ArrayList
<
MessageQueue
>();
Map
<
MessageQueue
,
String
>
allocateToAllOrigin
=
new
TreeMap
<>();
Map
<
MessageQueue
,
String
>
allocateToAllOrigin
=
new
TreeMap
<
MessageQueue
,
String
>();
//test allocate all
//test allocate all
{
{
List
<
String
>
cidBegin
=
new
ArrayList
<>(
cidAll
);
List
<
String
>
cidBegin
=
new
ArrayList
<
String
>(
cidAll
);
//System.out.println("cidAll:" + cidBegin.toString());
//System.out.println("cidAll:" + cidBegin.toString());
for
(
String
cid
:
cidBegin
)
{
for
(
String
cid
:
cidBegin
)
{
...
@@ -135,13 +136,13 @@ public class AllocateMessageQueueConsitentHashTest {
...
@@ -135,13 +136,13 @@ public class AllocateMessageQueueConsitentHashTest {
verifyAllocateAll
(
cidBegin
,
mqAll
,
allocatedResAll
));
verifyAllocateAll
(
cidBegin
,
mqAll
,
allocatedResAll
));
}
}
Map
<
MessageQueue
,
String
>
allocateToAllAfterRemoveOne
=
new
TreeMap
<>();
Map
<
MessageQueue
,
String
>
allocateToAllAfterRemoveOne
=
new
TreeMap
<
MessageQueue
,
String
>();
List
<
String
>
cidAfterRemoveOne
=
new
ArrayList
<>(
cidAll
);
List
<
String
>
cidAfterRemoveOne
=
new
ArrayList
<
String
>(
cidAll
);
//test allocate remove one cid
//test allocate remove one cid
{
{
String
removeCID
=
cidAfterRemoveOne
.
remove
(
0
);
String
removeCID
=
cidAfterRemoveOne
.
remove
(
0
);
//System.out.println("removing one cid "+removeCID);
//System.out.println("removing one cid "+removeCID);
List
<
MessageQueue
>
mqShouldOnlyChanged
=
new
ArrayList
<>();
List
<
MessageQueue
>
mqShouldOnlyChanged
=
new
ArrayList
<
MessageQueue
>();
Iterator
<
Map
.
Entry
<
MessageQueue
,
String
>>
it
=
allocateToAllOrigin
.
entrySet
().
iterator
();
Iterator
<
Map
.
Entry
<
MessageQueue
,
String
>>
it
=
allocateToAllOrigin
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
while
(
it
.
hasNext
())
{
Map
.
Entry
<
MessageQueue
,
String
>
entry
=
it
.
next
();
Map
.
Entry
<
MessageQueue
,
String
>
entry
=
it
.
next
();
...
@@ -151,7 +152,7 @@ public class AllocateMessageQueueConsitentHashTest {
...
@@ -151,7 +152,7 @@ public class AllocateMessageQueueConsitentHashTest {
}
}
//System.out.println("cidAll:" + cidAfterRemoveOne.toString());
//System.out.println("cidAll:" + cidAfterRemoveOne.toString());
List
<
MessageQueue
>
allocatedResAllAfterRemove
=
new
ArrayList
<>();
List
<
MessageQueue
>
allocatedResAllAfterRemove
=
new
ArrayList
<
MessageQueue
>();
for
(
String
cid
:
cidAfterRemoveOne
)
{
for
(
String
cid
:
cidAfterRemoveOne
)
{
List
<
MessageQueue
>
rs
=
allocateMessageQueueConsistentHash
.
allocate
(
"testConsumerGroup"
,
cid
,
mqAll
,
cidAfterRemoveOne
);
List
<
MessageQueue
>
rs
=
allocateMessageQueueConsistentHash
.
allocate
(
"testConsumerGroup"
,
cid
,
mqAll
,
cidAfterRemoveOne
);
allocatedResAllAfterRemove
.
addAll
(
rs
);
allocatedResAllAfterRemove
.
addAll
(
rs
);
...
@@ -166,16 +167,16 @@ public class AllocateMessageQueueConsitentHashTest {
...
@@ -166,16 +167,16 @@ public class AllocateMessageQueueConsitentHashTest {
verifyAfterRemove
(
allocateToAllOrigin
,
allocateToAllAfterRemoveOne
,
removeCID
);
verifyAfterRemove
(
allocateToAllOrigin
,
allocateToAllAfterRemoveOne
,
removeCID
);
}
}
List
<
String
>
cidAfterAdd
=
new
ArrayList
<>(
cidAfterRemoveOne
);
List
<
String
>
cidAfterAdd
=
new
ArrayList
<
String
>(
cidAfterRemoveOne
);
//test allocate add one more cid
//test allocate add one more cid
{
{
String
newCid
=
CID_PREFIX
+
"NEW"
;
String
newCid
=
CID_PREFIX
+
"NEW"
;
//System.out.println("add one more cid "+newCid);
//System.out.println("add one more cid "+newCid);
cidAfterAdd
.
add
(
newCid
);
cidAfterAdd
.
add
(
newCid
);
List
<
MessageQueue
>
mqShouldOnlyChanged
=
new
ArrayList
<>();
List
<
MessageQueue
>
mqShouldOnlyChanged
=
new
ArrayList
<
MessageQueue
>();
//System.out.println("cidAll:" + cidAfterAdd.toString());
//System.out.println("cidAll:" + cidAfterAdd.toString());
List
<
MessageQueue
>
allocatedResAllAfterAdd
=
new
ArrayList
<>();
List
<
MessageQueue
>
allocatedResAllAfterAdd
=
new
ArrayList
<
MessageQueue
>();
Map
<
MessageQueue
,
String
>
allocateToAll3
=
new
TreeMap
<>();
Map
<
MessageQueue
,
String
>
allocateToAll3
=
new
TreeMap
<
MessageQueue
,
String
>();
for
(
String
cid
:
cidAfterAdd
)
{
for
(
String
cid
:
cidAfterAdd
)
{
List
<
MessageQueue
>
rs
=
allocateMessageQueueConsistentHash
.
allocate
(
"testConsumerGroup"
,
cid
,
mqAll
,
cidAfterAdd
);
List
<
MessageQueue
>
rs
=
allocateMessageQueueConsistentHash
.
allocate
(
"testConsumerGroup"
,
cid
,
mqAll
,
cidAfterAdd
);
allocatedResAllAfterAdd
.
addAll
(
rs
);
allocatedResAllAfterAdd
.
addAll
(
rs
);
...
@@ -225,7 +226,7 @@ public class AllocateMessageQueueConsitentHashTest {
...
@@ -225,7 +226,7 @@ public class AllocateMessageQueueConsitentHashTest {
}
}
private
List
<
String
>
createConsumerIdList
(
int
size
)
{
private
List
<
String
>
createConsumerIdList
(
int
size
)
{
List
<
String
>
consumerIdList
=
new
ArrayList
<>(
size
);
List
<
String
>
consumerIdList
=
new
ArrayList
<
String
>(
size
);
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
consumerIdList
.
add
(
CID_PREFIX
+
String
.
valueOf
(
i
));
consumerIdList
.
add
(
CID_PREFIX
+
String
.
valueOf
(
i
));
}
}
...
@@ -233,7 +234,7 @@ public class AllocateMessageQueueConsitentHashTest {
...
@@ -233,7 +234,7 @@ public class AllocateMessageQueueConsitentHashTest {
}
}
private
List
<
MessageQueue
>
createMessageQueueList
(
int
size
)
{
private
List
<
MessageQueue
>
createMessageQueueList
(
int
size
)
{
List
<
MessageQueue
>
messageQueueList
=
new
ArrayList
<>(
size
);
List
<
MessageQueue
>
messageQueueList
=
new
ArrayList
<
MessageQueue
>(
size
);
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
MessageQueue
mq
=
new
MessageQueue
(
topic
,
"brokerName"
,
i
);
MessageQueue
mq
=
new
MessageQueue
(
topic
,
"brokerName"
,
i
);
messageQueueList
.
add
(
mq
);
messageQueueList
.
add
(
mq
);
...
...
common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
浏览文件 @
8d781757
...
@@ -30,7 +30,7 @@ import java.util.TreeMap;
...
@@ -30,7 +30,7 @@ import java.util.TreeMap;
* @param <T>
* @param <T>
*/
*/
public
class
ConsistentHashRouter
<
T
extends
Node
>
{
public
class
ConsistentHashRouter
<
T
extends
Node
>
{
private
final
SortedMap
<
Long
,
VirtualNode
<
T
>>
ring
=
new
TreeMap
<>();
private
final
SortedMap
<
Long
,
VirtualNode
<
T
>>
ring
=
new
TreeMap
<
Long
,
VirtualNode
<
T
>
>();
private
final
HashFunction
hashFunction
;
private
final
HashFunction
hashFunction
;
public
ConsistentHashRouter
(
Collection
<
T
>
pNodes
,
int
vNodeCount
)
{
public
ConsistentHashRouter
(
Collection
<
T
>
pNodes
,
int
vNodeCount
)
{
...
@@ -64,7 +64,7 @@ public class ConsistentHashRouter<T extends Node> {
...
@@ -64,7 +64,7 @@ public class ConsistentHashRouter<T extends Node> {
if
(
vNodeCount
<
0
)
throw
new
IllegalArgumentException
(
"illegal virtual node counts :"
+
vNodeCount
);
if
(
vNodeCount
<
0
)
throw
new
IllegalArgumentException
(
"illegal virtual node counts :"
+
vNodeCount
);
int
existingReplicas
=
getExistingReplicas
(
pNode
);
int
existingReplicas
=
getExistingReplicas
(
pNode
);
for
(
int
i
=
0
;
i
<
vNodeCount
;
i
++)
{
for
(
int
i
=
0
;
i
<
vNodeCount
;
i
++)
{
VirtualNode
<
T
>
vNode
=
new
VirtualNode
<>(
pNode
,
i
+
existingReplicas
);
VirtualNode
<
T
>
vNode
=
new
VirtualNode
<
T
>(
pNode
,
i
+
existingReplicas
);
ring
.
put
(
hashFunction
.
hash
(
vNode
.
getKey
()),
vNode
);
ring
.
put
(
hashFunction
.
hash
(
vNode
.
getKey
()),
vNode
);
}
}
}
}
...
...
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
浏览文件 @
8d781757
...
@@ -722,7 +722,7 @@ public class CommitLog {
...
@@ -722,7 +722,7 @@ public class CommitLog {
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
messageExtBatch
.
setEncodedBuff
(
batchEncoder
.
encode
(
messageExtBatch
));
lockForPutMessage
();
//spin...
putMessageLock
.
lock
();
try
{
try
{
long
beginLockTimestamp
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
long
beginLockTimestamp
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
this
.
beginTimeInLock
=
beginLockTimestamp
;
this
.
beginTimeInLock
=
beginLockTimestamp
;
...
@@ -771,7 +771,7 @@ public class CommitLog {
...
@@ -771,7 +771,7 @@ public class CommitLog {
eclipseTimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginLockTimestamp
;
eclipseTimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginLockTimestamp
;
beginTimeInLock
=
0
;
beginTimeInLock
=
0
;
}
finally
{
}
finally
{
releasePutMessageL
ock
();
putMessageLock
.
unl
ock
();
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录