Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
a8ef92e9
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
a8ef92e9
编写于
11月 24, 2021
作者:
D
dongeforever
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix the serialize probelm
上级
6c640286
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
110 addition
and
92 deletion
+110
-92
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+3
-2
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
...pache/rocketmq/broker/processor/AdminBrokerProcessor.java
+1
-1
broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
...pache/rocketmq/broker/processor/PullMessageProcessor.java
+1
-1
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
...pache/rocketmq/broker/topic/TopicQueueMappingManager.java
+6
-5
common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
...he/rocketmq/common/statictopic/LogicQueueMappingItem.java
+28
-5
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
...rocketmq/common/statictopic/TopicQueueMappingContext.java
+5
-3
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
.../rocketmq/common/statictopic/TopicQueueMappingDetail.java
+26
-39
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
...he/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+13
-1
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
...che/rocketmq/common/statictopic/TopicQueueMappingOne.java
+5
-3
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
...e/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+8
-9
common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
...he/rocketmq/common/statictopic/TopicQueueMappingTest.java
+11
-20
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
...rg/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+3
-3
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
a8ef92e9
...
...
@@ -95,6 +95,7 @@ import org.apache.rocketmq.common.DataVersion;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.ThreadFactoryImpl
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail
;
import
org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.constant.LoggerName
;
...
...
@@ -1084,7 +1085,7 @@ public class BrokerController {
Map
<
String
,
TopicQueueMappingInfo
>
topicQueueMappingInfoMap
=
topicConfigList
.
stream
()
.
map
(
TopicConfig:
:
getTopicName
)
.
map
(
topicName
->
Optional
.
ofNullable
(
this
.
topicQueueMappingManager
.
getTopicQueueMapping
(
topicName
))
.
map
(
info
->
new
AbstractMap
.
SimpleImmutableEntry
<>(
topicName
,
info
.
cloneAsMappingInfo
(
)))
.
map
(
info
->
new
AbstractMap
.
SimpleImmutableEntry
<>(
topicName
,
TopicQueueMappingDetail
.
cloneAsMappingInfo
(
info
)))
.
orElse
(
null
))
.
filter
(
Objects:
:
nonNull
)
.
collect
(
Collectors
.
toMap
(
Map
.
Entry
::
getKey
,
Map
.
Entry
::
getValue
));
...
...
@@ -1103,7 +1104,7 @@ public class BrokerController {
topicConfigWrapper
.
setTopicConfigTable
(
this
.
getTopicConfigManager
().
getTopicConfigTable
());
topicConfigWrapper
.
setTopicQueueMappingInfoMap
(
this
.
getTopicQueueMappingManager
().
getTopicQueueMappingTable
().
entrySet
().
stream
().
map
(
entry
->
new
AbstractMap
.
SimpleImmutableEntry
<>(
entry
.
getKey
(),
entry
.
getValue
().
cloneAsMappingInfo
(
))
entry
->
new
AbstractMap
.
SimpleImmutableEntry
<>(
entry
.
getKey
(),
TopicQueueMappingDetail
.
cloneAsMappingInfo
(
entry
.
getValue
()
))
).
collect
(
Collectors
.
toMap
(
Map
.
Entry
::
getKey
,
Map
.
Entry
::
getValue
)));
if
(!
PermName
.
isWriteable
(
this
.
getBrokerConfig
().
getBrokerPermission
())
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
浏览文件 @
a8ef92e9
...
...
@@ -645,7 +645,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
||
!
mappingDetail
.
getBname
().
equals
(
mappingItem
.
getBname
()))
{
return
buildErrorResponse
(
ResponseCode
.
NOT_LEADER_FOR_QUEUE
,
String
.
format
(
"%s-%d does not exit in request process of current broker %s"
,
mappingContext
.
getTopic
(),
mappingContext
.
getGlobalId
(),
mappingDetail
.
getBname
()));
}
Immutable
List
<
LogicQueueMappingItem
>
mappingItems
=
mappingContext
.
getMappingItemList
();
List
<
LogicQueueMappingItem
>
mappingItems
=
mappingContext
.
getMappingItemList
();
//TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
Long
timestamp
=
requestHeader
.
getTimestamp
();
long
offset
=
-
1
;
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
浏览文件 @
a8ef92e9
...
...
@@ -187,7 +187,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//handle max offset
{
if
(
mappingItem
.
checkIfEndOffsetDecided
())
{
responseHeader
.
setMaxOffset
(
Math
.
max
(
mappingItem
.
computeMaxStaticQueueOffset
(),
mappingDetail
.
computeMaxOffsetFromMapping
(
mappingContext
.
getGlobalId
())));
responseHeader
.
setMaxOffset
(
Math
.
max
(
mappingItem
.
computeMaxStaticQueueOffset
(),
TopicQueueMappingDetail
.
computeMaxOffsetFromMapping
(
mappingDetail
,
mappingContext
.
getGlobalId
())));
}
else
{
responseHeader
.
setMaxOffset
(
mappingItem
.
computeStaticQueueOffsetUpToEnd
(
responseHeader
.
getMaxOffset
()));
}
...
...
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
浏览文件 @
a8ef92e9
...
...
@@ -34,6 +34,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import
org.apache.rocketmq.common.rpc.TopicQueueRequestHeader
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
java.util.List
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.TimeUnit
;
...
...
@@ -93,8 +94,8 @@ public class TopicQueueMappingManager extends ConfigManager {
throw
new
RuntimeException
(
String
.
format
(
"Can't accept data with small epoch %d < %d"
,
newDetail
.
getEpoch
(),
oldDetail
.
getEpoch
()));
}
for
(
Integer
globalId
:
oldDetail
.
getHostedQueues
().
keySet
())
{
Immutable
List
<
LogicQueueMappingItem
>
oldItems
=
oldDetail
.
getHostedQueues
().
get
(
globalId
);
Immutable
List
<
LogicQueueMappingItem
>
newItems
=
newDetail
.
getHostedQueues
().
get
(
globalId
);
List
<
LogicQueueMappingItem
>
oldItems
=
oldDetail
.
getHostedQueues
().
get
(
globalId
);
List
<
LogicQueueMappingItem
>
newItems
=
newDetail
.
getHostedQueues
().
get
(
globalId
);
if
(
newItems
==
null
)
{
//keep the old
newDetail
.
getHostedQueues
().
put
(
globalId
,
oldItems
);
...
...
@@ -191,18 +192,18 @@ public class TopicQueueMappingManager extends ConfigManager {
return
new
TopicQueueMappingContext
(
requestHeader
.
getTopic
(),
globalId
,
globalOffset
,
mappingDetail
,
null
,
null
);
}
Immutable
List
<
LogicQueueMappingItem
>
mappingItemList
=
null
;
List
<
LogicQueueMappingItem
>
mappingItemList
=
null
;
LogicQueueMappingItem
mappingItem
=
null
;
if
(
globalOffset
==
null
||
Long
.
MAX_VALUE
==
globalOffset
)
{
mappingItemList
=
mappingDetail
.
getMappingInfo
(
globalId
);
mappingItemList
=
TopicQueueMappingDetail
.
getMappingInfo
(
mappingDetail
,
globalId
);
if
(
mappingItemList
!=
null
&&
mappingItemList
.
size
()
>
0
)
{
mappingItem
=
mappingItemList
.
get
(
mappingItemList
.
size
()
-
1
);
}
}
else
{
mappingItemList
=
mappingDetail
.
getMappingInfo
(
globalId
);
mappingItemList
=
TopicQueueMappingDetail
.
getMappingInfo
(
mappingDetail
,
globalId
);
mappingItem
=
TopicQueueMappingDetail
.
findLogicQueueMappingItem
(
mappingItemList
,
globalOffset
);
}
return
new
TopicQueueMappingContext
(
requestHeader
.
getTopic
(),
globalId
,
globalOffset
,
mappingDetail
,
mappingItemList
,
mappingItem
);
...
...
common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
浏览文件 @
a8ef92e9
package
org.apache.rocketmq.common.statictopic
;
public
class
LogicQueueMappingItem
{
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
private
final
int
gen
;
// immutable
private
final
int
queueId
;
//, immutable
private
final
String
bname
;
//important, immutable
public
class
LogicQueueMappingItem
extends
RemotingSerializable
{
private
int
gen
;
// immutable
private
int
queueId
;
//, immutable
private
String
bname
;
//important, immutable
private
long
logicOffset
;
// the start of the logic offset, important, can be changed by command only once
private
final
long
startOffset
;
// the start of the physical offset, should always be 0, immutable
private
long
startOffset
;
// the start of the physical offset, should always be 0, immutable
private
long
endOffset
=
-
1
;
// the end of the physical offset, excluded, revered -1, mutable
private
long
timeOfStart
=
-
1
;
// mutable, reserved
private
long
timeOfEnd
=
-
1
;
// mutable, reserved
//make sure it has a default constructor
public
LogicQueueMappingItem
()
{
}
public
LogicQueueMappingItem
(
int
gen
,
int
queueId
,
String
bname
,
long
logicOffset
,
long
startOffset
,
long
endOffset
,
long
timeOfStart
,
long
timeOfEnd
)
{
this
.
gen
=
gen
;
this
.
queueId
=
queueId
;
...
...
@@ -112,6 +119,22 @@ public class LogicQueueMappingItem {
this
.
timeOfEnd
=
timeOfEnd
;
}
public
void
setGen
(
int
gen
)
{
this
.
gen
=
gen
;
}
public
void
setQueueId
(
int
queueId
)
{
this
.
queueId
=
queueId
;
}
public
void
setBname
(
String
bname
)
{
this
.
bname
=
bname
;
}
public
void
setStartOffset
(
long
startOffset
)
{
this
.
startOffset
=
startOffset
;
}
@Override
public
String
toString
()
{
return
"LogicQueueMappingItem{"
+
...
...
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
浏览文件 @
a8ef92e9
...
...
@@ -18,15 +18,17 @@ package org.apache.rocketmq.common.statictopic;
import
com.google.common.collect.ImmutableList
;
import
java.util.List
;
public
class
TopicQueueMappingContext
{
private
String
topic
;
private
Integer
globalId
;
private
Long
globalOffset
;
private
TopicQueueMappingDetail
mappingDetail
;
private
Immutable
List
<
LogicQueueMappingItem
>
mappingItemList
;
private
List
<
LogicQueueMappingItem
>
mappingItemList
;
private
LogicQueueMappingItem
mappingItem
;
public
TopicQueueMappingContext
(
String
topic
,
Integer
globalId
,
Long
globalOffset
,
TopicQueueMappingDetail
mappingDetail
,
Immutable
List
<
LogicQueueMappingItem
>
mappingItemList
,
LogicQueueMappingItem
mappingItem
)
{
public
TopicQueueMappingContext
(
String
topic
,
Integer
globalId
,
Long
globalOffset
,
TopicQueueMappingDetail
mappingDetail
,
List
<
LogicQueueMappingItem
>
mappingItemList
,
LogicQueueMappingItem
mappingItem
)
{
this
.
topic
=
topic
;
this
.
globalId
=
globalId
;
this
.
globalOffset
=
globalOffset
;
...
...
@@ -73,7 +75,7 @@ public class TopicQueueMappingContext {
this
.
mappingDetail
=
mappingDetail
;
}
public
Immutable
List
<
LogicQueueMappingItem
>
getMappingItemList
()
{
public
List
<
LogicQueueMappingItem
>
getMappingItemList
()
{
return
mappingItemList
;
}
...
...
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
浏览文件 @
a8ef92e9
...
...
@@ -16,8 +16,6 @@
*/
package
org.apache.rocketmq.common.statictopic
;
import
com.google.common.collect.ImmutableList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
...
...
@@ -27,50 +25,45 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver
// make sure this value is not null
private
ConcurrentMap
<
Integer
/*global id*/
,
ImmutableList
<
LogicQueueMappingItem
>>
hostedQueues
=
new
ConcurrentHashMap
<
Integer
,
ImmutableList
<
LogicQueueMappingItem
>>();
private
ConcurrentMap
<
Integer
/*global id*/
,
List
<
LogicQueueMappingItem
>>
hostedQueues
=
new
ConcurrentHashMap
<
Integer
,
List
<
LogicQueueMappingItem
>>();
//make sure there is a default constructor
public
TopicQueueMappingDetail
()
{
}
public
TopicQueueMappingDetail
(
String
topic
,
int
totalQueues
,
String
bname
,
long
epoch
)
{
super
(
topic
,
totalQueues
,
bname
,
epoch
);
buildIdMap
();
}
public
boolean
putMappingInfo
(
Integer
globalId
,
Immutable
List
<
LogicQueueMappingItem
>
mappingInfo
)
{
public
static
boolean
putMappingInfo
(
TopicQueueMappingDetail
mappingDetail
,
Integer
globalId
,
List
<
LogicQueueMappingItem
>
mappingInfo
)
{
if
(
mappingInfo
.
isEmpty
())
{
return
true
;
}
hostedQueues
.
put
(
globalId
,
mappingInfo
);
buildIdMap
();
mappingDetail
.
hostedQueues
.
put
(
globalId
,
mappingInfo
);
return
true
;
}
public
void
buildIdMap
(
)
{
this
.
currIdMap
=
buildIdMap
(
LEVEL_0
);
public
static
List
<
LogicQueueMappingItem
>
getMappingInfo
(
TopicQueueMappingDetail
mappingDetail
,
Integer
globalId
)
{
return
mappingDetail
.
hostedQueues
.
get
(
globalId
);
}
public
ConcurrentMap
<
Integer
,
Integer
>
buildIdMap
(
int
level
)
{
public
static
ConcurrentMap
<
Integer
,
Integer
>
buildIdMap
(
TopicQueueMappingDetail
mappingDetail
,
int
level
)
{
//level 0 means current leader in this broker
//level 1 means previous leader in this broker, reserved for
assert
level
==
LEVEL_0
;
if
(
hostedQueues
==
null
||
hostedQueues
.
isEmpty
())
{
if
(
mappingDetail
.
hostedQueues
==
null
||
mappingDetail
.
hostedQueues
.
isEmpty
())
{
return
new
ConcurrentHashMap
<
Integer
,
Integer
>();
}
ConcurrentMap
<
Integer
,
Integer
>
tmpIdMap
=
new
ConcurrentHashMap
<
Integer
,
Integer
>();
for
(
Map
.
Entry
<
Integer
,
ImmutableList
<
LogicQueueMappingItem
>>
entry:
hostedQueues
.
entrySet
())
{
for
(
Map
.
Entry
<
Integer
,
List
<
LogicQueueMappingItem
>>
entry:
mappingDetail
.
hostedQueues
.
entrySet
())
{
Integer
globalId
=
entry
.
getKey
();
Immutable
List
<
LogicQueueMappingItem
>
items
=
entry
.
getValue
();
List
<
LogicQueueMappingItem
>
items
=
entry
.
getValue
();
if
(
level
==
LEVEL_0
&&
items
.
size
()
>=
1
)
{
LogicQueueMappingItem
curr
=
items
.
get
(
items
.
size
()
-
1
);
if
(
bname
.
equals
(
curr
.
getBname
()))
{
if
(
mappingDetail
.
bname
.
equals
(
curr
.
getBname
()))
{
tmpIdMap
.
put
(
globalId
,
curr
.
getQueueId
());
}
}
...
...
@@ -78,14 +71,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return
tmpIdMap
;
}
public
ImmutableList
<
LogicQueueMappingItem
>
getMappingInfo
(
Integer
globalId
)
{
return
hostedQueues
.
get
(
globalId
);
}
public
static
LogicQueueMappingItem
findLogicQueueMappingItem
(
ImmutableList
<
LogicQueueMappingItem
>
mappingItems
,
long
logicOffset
)
{
public
static
LogicQueueMappingItem
findLogicQueueMappingItem
(
List
<
LogicQueueMappingItem
>
mappingItems
,
long
logicOffset
)
{
if
(
mappingItems
==
null
||
mappingItems
.
isEmpty
())
{
return
null
;
...
...
@@ -106,8 +93,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return
null
;
}
public
long
computeMaxOffsetFromMapping
(
Integer
globalId
)
{
List
<
LogicQueueMappingItem
>
mappingItems
=
getMappingInfo
(
globalId
);
public
static
long
computeMaxOffsetFromMapping
(
TopicQueueMappingDetail
mappingDetail
,
Integer
globalId
)
{
List
<
LogicQueueMappingItem
>
mappingItems
=
getMappingInfo
(
mappingDetail
,
globalId
);
if
(
mappingItems
==
null
||
mappingItems
.
isEmpty
())
{
return
-
1
;
...
...
@@ -117,24 +104,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
}
public
TopicQueueMappingInfo
cloneAsMappingInfo
(
)
{
TopicQueueMappingInfo
topicQueueMappingInfo
=
new
TopicQueueMappingInfo
(
this
.
topic
,
this
.
totalQueues
,
this
.
bname
,
this
.
epoch
);
topicQueueMappingInfo
.
currIdMap
=
this
.
buildIdMap
(
LEVEL_0
);
public
static
TopicQueueMappingInfo
cloneAsMappingInfo
(
TopicQueueMappingDetail
mappingDetail
)
{
TopicQueueMappingInfo
topicQueueMappingInfo
=
new
TopicQueueMappingInfo
(
mappingDetail
.
topic
,
mappingDetail
.
totalQueues
,
mappingDetail
.
bname
,
mappingDetail
.
epoch
);
topicQueueMappingInfo
.
currIdMap
=
TopicQueueMappingDetail
.
buildIdMap
(
mappingDetail
,
LEVEL_0
);
return
topicQueueMappingInfo
;
}
public
ConcurrentMap
<
Integer
,
ImmutableList
<
LogicQueueMappingItem
>>
getHostedQueues
()
{
return
hostedQueues
;
public
static
boolean
checkIfAsPhysical
(
TopicQueueMappingDetail
mappingDetail
,
Integer
globalId
)
{
List
<
LogicQueueMappingItem
>
mappingItems
=
getMappingInfo
(
mappingDetail
,
globalId
);
return
mappingItems
==
null
||
(
mappingItems
.
size
()
==
1
&&
mappingItems
.
get
(
0
).
getLogicOffset
()
==
0
);
}
public
void
setHostedQueues
(
ConcurrentMap
<
Integer
,
ImmutableList
<
LogicQueueMappingItem
>>
hostedQueues
)
{
this
.
hostedQueues
=
hostedQueues
;
public
ConcurrentMap
<
Integer
,
List
<
LogicQueueMappingItem
>>
getHostedQueues
(
)
{
return
hostedQueues
;
}
public
boolean
checkIfAsPhysical
(
Integer
globalId
)
{
List
<
LogicQueueMappingItem
>
mappingItems
=
getMappingInfo
(
globalId
);
return
mappingItems
==
null
||
(
mappingItems
.
size
()
==
1
&&
mappingItems
.
get
(
0
).
getLogicOffset
()
==
0
);
public
void
setHostedQueues
(
ConcurrentMap
<
Integer
,
List
<
LogicQueueMappingItem
>>
hostedQueues
)
{
this
.
hostedQueues
=
hostedQueues
;
}
}
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
浏览文件 @
a8ef92e9
...
...
@@ -30,7 +30,7 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
long
epoch
;
//important to fence the old dirty data
boolean
dirty
;
//indicate if the data is dirty
//register to broker to construct the route
transient
ConcurrentMap
<
Integer
/*logicId*/
,
Integer
/*physicalId*/
>
currIdMap
=
new
ConcurrentHashMap
<
Integer
,
Integer
>();
protected
ConcurrentMap
<
Integer
/*logicId*/
,
Integer
/*physicalId*/
>
currIdMap
=
new
ConcurrentHashMap
<
Integer
,
Integer
>();
public
TopicQueueMappingInfo
()
{
...
...
@@ -80,4 +80,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
public
ConcurrentMap
<
Integer
,
Integer
>
getCurrIdMap
()
{
return
currIdMap
;
}
public
void
setTopic
(
String
topic
)
{
this
.
topic
=
topic
;
}
public
void
setBname
(
String
bname
)
{
this
.
bname
=
bname
;
}
public
void
setCurrIdMap
(
ConcurrentMap
<
Integer
,
Integer
>
currIdMap
)
{
this
.
currIdMap
=
currIdMap
;
}
}
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
浏览文件 @
a8ef92e9
...
...
@@ -19,14 +19,16 @@ package org.apache.rocketmq.common.statictopic;
import
com.google.common.collect.ImmutableList
;
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
import
java.util.List
;
public
class
TopicQueueMappingOne
extends
RemotingSerializable
{
String
topic
;
// redundant field
String
bname
;
//identify the hosted broker name
Integer
globalId
;
Immutable
List
<
LogicQueueMappingItem
>
items
;
List
<
LogicQueueMappingItem
>
items
;
public
TopicQueueMappingOne
(
String
topic
,
String
bname
,
Integer
globalId
,
Immutable
List
<
LogicQueueMappingItem
>
items
)
{
public
TopicQueueMappingOne
(
String
topic
,
String
bname
,
Integer
globalId
,
List
<
LogicQueueMappingItem
>
items
)
{
this
.
topic
=
topic
;
this
.
bname
=
bname
;
this
.
globalId
=
globalId
;
...
...
@@ -45,7 +47,7 @@ public class TopicQueueMappingOne extends RemotingSerializable {
return
globalId
;
}
public
Immutable
List
<
LogicQueueMappingItem
>
getItems
()
{
public
List
<
LogicQueueMappingItem
>
getItems
()
{
return
items
;
}
}
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
浏览文件 @
a8ef92e9
...
...
@@ -34,7 +34,6 @@ import java.util.Map;
import
java.util.Queue
;
import
java.util.Random
;
import
java.util.Set
;
import
java.util.stream.Collectors
;
public
class
TopicQueueMappingUtils
{
...
...
@@ -168,7 +167,7 @@ public class TopicQueueMappingUtils {
return
new
AbstractMap
.
SimpleEntry
<
Long
,
Integer
>(
maxEpoch
,
maxNum
);
}
public
static
void
makeSureLogicQueueMappingItemImmutable
(
ImmutableList
<
LogicQueueMappingItem
>
oldItems
,
Immutable
List
<
LogicQueueMappingItem
>
newItems
)
{
public
static
void
makeSureLogicQueueMappingItemImmutable
(
List
<
LogicQueueMappingItem
>
oldItems
,
List
<
LogicQueueMappingItem
>
newItems
)
{
if
(
oldItems
==
null
||
oldItems
.
isEmpty
())
{
return
;
}
...
...
@@ -198,7 +197,7 @@ public class TopicQueueMappingUtils {
}
public
static
void
checkLogicQueueMappingItemOffset
(
Immutable
List
<
LogicQueueMappingItem
>
items
)
{
public
static
void
checkLogicQueueMappingItemOffset
(
List
<
LogicQueueMappingItem
>
items
)
{
if
(
items
==
null
||
items
.
isEmpty
())
{
return
;
...
...
@@ -248,7 +247,7 @@ public class TopicQueueMappingUtils {
if
(
mappingDetail
.
totalQueues
>
maxNum
)
{
maxNum
=
mappingDetail
.
totalQueues
;
}
for
(
Map
.
Entry
<
Integer
,
Immutable
List
<
LogicQueueMappingItem
>>
entry
:
mappingDetail
.
getHostedQueues
().
entrySet
())
{
for
(
Map
.
Entry
<
Integer
,
List
<
LogicQueueMappingItem
>>
entry
:
mappingDetail
.
getHostedQueues
().
entrySet
())
{
Integer
globalid
=
entry
.
getKey
();
checkLogicQueueMappingItemOffset
(
entry
.
getValue
());
String
leaderBrokerName
=
getLeaderBroker
(
entry
.
getValue
());
...
...
@@ -278,10 +277,10 @@ public class TopicQueueMappingUtils {
return
globalIdMap
;
}
public
static
String
getLeaderBroker
(
Immutable
List
<
LogicQueueMappingItem
>
items
)
{
public
static
String
getLeaderBroker
(
List
<
LogicQueueMappingItem
>
items
)
{
return
getLeaderItem
(
items
).
getBname
();
}
public
static
LogicQueueMappingItem
getLeaderItem
(
Immutable
List
<
LogicQueueMappingItem
>
items
)
{
public
static
LogicQueueMappingItem
getLeaderItem
(
List
<
LogicQueueMappingItem
>
items
)
{
assert
items
.
size
()
>
0
;
return
items
.
get
(
items
.
size
()
-
1
);
}
...
...
@@ -367,7 +366,7 @@ public class TopicQueueMappingUtils {
configMapping
.
setReadQueueNums
(
configMapping
.
getReadQueueNums
()
+
1
);
}
LogicQueueMappingItem
mappingItem
=
new
LogicQueueMappingItem
(
0
,
configMapping
.
getWriteQueueNums
()
-
1
,
broker
,
0
,
0
,
-
1
,
-
1
,
-
1
);
configMapping
.
getMappingDetail
().
putMappingInfo
(
queueId
,
ImmutableList
.
of
(
mappingItem
));
TopicQueueMappingDetail
.
putMappingInfo
(
configMapping
.
getMappingDetail
(),
queueId
,
ImmutableList
.
of
(
mappingItem
));
}
// set the topic config
...
...
@@ -458,8 +457,8 @@ public class TopicQueueMappingUtils {
ImmutableList
<
LogicQueueMappingItem
>
resultItems
=
ImmutableList
.
copyOf
(
items
);
//Use the same object
mapInConfig
.
getMappingDetail
().
putMappingInfo
(
queueId
,
resultItems
);
mapOutConfig
.
getMappingDetail
().
putMappingInfo
(
queueId
,
resultItems
);
TopicQueueMappingDetail
.
putMappingInfo
(
mapInConfig
.
getMappingDetail
(),
queueId
,
resultItems
);
TopicQueueMappingDetail
.
putMappingInfo
(
mapOutConfig
.
getMappingDetail
(),
queueId
,
resultItems
);
}
for
(
Map
.
Entry
<
String
,
TopicConfigAndQueueMapping
>
entry
:
brokerConfigMap
.
entrySet
())
{
...
...
common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
浏览文件 @
a8ef92e9
...
...
@@ -8,17 +8,10 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import
org.junit.Assert
;
import
org.junit.Test
;
import
java.io.File
;
import
java.util.Map
;
public
class
TopicQueueMappingTest
{
@Test
public
void
testWriteToFile
()
{
System
.
out
.
println
(
System
.
getProperty
(
"java.io.tmpdir"
));
System
.
out
.
println
(
File
.
separator
);
}
@Test
public
void
testJsonSerialize
()
{
LogicQueueMappingItem
mappingItem
=
new
LogicQueueMappingItem
(
1
,
2
,
"broker01"
,
33333333333333333L
,
44444444444444444L
,
555555555555555555L
,
6666666666666666L
,
77777777777777777L
);
...
...
@@ -29,35 +22,33 @@ public class TopicQueueMappingTest {
Assert
.
assertEquals
(
mappingItemMap
.
get
(
"bname"
),
mappingItem
.
getBname
());
Assert
.
assertEquals
(
mappingItemMap
.
get
(
"gen"
),
mappingItem
.
getGen
());
Assert
.
assertEquals
(
mappingItemMap
.
get
(
"logicOffset"
),
mappingItem
.
getLogicOffset
());
Assert
.
assertEquals
(
mappingItemMap
.
get
(
"queueId"
),
mappingItem
.
getQueueId
());
Assert
.
assertEquals
(
mappingItemMap
.
get
(
"startOffset"
),
mappingItem
.
getStartOffset
());
Assert
.
assertEquals
(
mappingItemMap
.
get
(
"endOffset"
),
mappingItem
.
getEndOffset
());
Assert
.
assertEquals
(
mappingItemMap
.
get
(
"timeOfStart"
),
mappingItem
.
getTimeOfStart
());
Assert
.
assertEquals
(
mappingItemMap
.
get
(
"timeOfEnd"
),
mappingItem
.
getTimeOfEnd
());
}
//test the decode encode
{
String
mappingItemJson2
=
RemotingSerializable
.
toJson
(
RemotingSerializable
.
decode
(
mappingItemJson
.
getBytes
(),
LogicQueueMappingItem
.
class
),
false
);
Assert
.
assertEquals
(
mappingItemJson
,
mappingItemJson2
);
LogicQueueMappingItem
mappingItemFromJson
=
RemotingSerializable
.
fromJson
(
mappingItemJson
,
LogicQueueMappingItem
.
class
);
Assert
.
assertEquals
(
mappingItemJson
,
RemotingSerializable
.
toJson
(
mappingItemFromJson
,
false
)
);
}
TopicQueueMappingDetail
mappingDetail
=
new
TopicQueueMappingDetail
(
"test"
,
1
,
"broker01"
,
System
.
currentTimeMillis
());
mappingDetail
.
putMappingInfo
(
0
,
ImmutableList
.
of
(
mappingItem
));
TopicQueueMappingDetail
.
putMappingInfo
(
mappingDetail
,
0
,
ImmutableList
.
of
(
mappingItem
));
String
mappingDetailJson
=
JSON
.
toJSONString
(
mappingDetail
);
{
Map
mappingDetailMap
=
JSON
.
parseObject
(
mappingDetailJson
);
Assert
.
assertFalse
(
mappingDetailMap
.
containsKey
(
"prevIdMap"
));
Assert
.
assertFalse
(
mappingDetailMap
.
containsKey
(
"currIdMap"
));
Assert
.
assertEquals
(
6
,
mappingDetailMap
.
size
());
Assert
.
assertTrue
(
mappingDetailMap
.
containsKey
(
"currIdMap"
));
Assert
.
assertEquals
(
7
,
mappingDetailMap
.
size
());
Assert
.
assertEquals
(
1
,
((
JSONObject
)
mappingDetailMap
.
get
(
"hostedQueues"
)).
size
());
Assert
.
assertEquals
(
1
,
((
JSONArray
)((
JSONObject
)
mappingDetailMap
.
get
(
"hostedQueues"
)).
get
(
"0"
)).
size
());
}
{
System
.
out
.
println
(
mappingDetailJson
);
TopicQueueMappingDetail
detailFromJson
=
RemotingSerializable
.
decode
(
mappingDetailJson
.
getBytes
(),
TopicQueueMappingDetail
.
class
);
System
.
out
.
println
(
JSON
.
toJSONString
(
detailFromJson
));
//Assert.assertEquals(1, detailFromJson.getHostedQueues().size());
//Assert.assertEquals(1, detailFromJson.getHostedQueues().get("0").size());
TopicQueueMappingDetail
mappingDetailFromJson
=
RemotingSerializable
.
decode
(
mappingDetailJson
.
getBytes
(),
TopicQueueMappingDetail
.
class
);
Assert
.
assertEquals
(
1
,
mappingDetailFromJson
.
getHostedQueues
().
size
());
Assert
.
assertEquals
(
1
,
mappingDetailFromJson
.
getHostedQueues
().
get
(
0
).
size
());
Assert
.
assertEquals
(
mappingDetailJson
,
RemotingSerializable
.
toJson
(
mappingDetailFromJson
,
false
));
}
}
}
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
浏览文件 @
a8ef92e9
...
...
@@ -1137,8 +1137,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
TopicStatsTable
statsTable
=
examineTopicStats
(
addr
,
topic
);
TopicConfigAndQueueMapping
mapOutConfig
=
brokerConfigMap
.
get
(
broker
);
for
(
Map
.
Entry
<
Integer
,
Immutable
List
<
LogicQueueMappingItem
>>
entry
:
mapOutConfig
.
getMappingDetail
().
getHostedQueues
().
entrySet
())
{
Immutable
List
<
LogicQueueMappingItem
>
items
=
entry
.
getValue
();
for
(
Map
.
Entry
<
Integer
,
List
<
LogicQueueMappingItem
>>
entry
:
mapOutConfig
.
getMappingDetail
().
getHostedQueues
().
entrySet
())
{
List
<
LogicQueueMappingItem
>
items
=
entry
.
getValue
();
Integer
globalId
=
entry
.
getKey
();
if
(
items
.
size
()
<
2
)
{
continue
;
...
...
@@ -1159,7 +1159,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
newLeader
.
setLogicOffset
(
TopicQueueMappingUtils
.
blockSeqRoundUp
(
oldLeader
.
computeStaticQueueOffset
(
topicOffset
.
getMaxOffset
()),
blockSeqSize
));
TopicConfigAndQueueMapping
mapInConfig
=
brokerConfigMap
.
get
(
newLeader
.
getBname
());
//fresh the new leader
mapInConfig
.
getMappingDetail
().
putMappingInfo
(
globalId
,
items
);
TopicQueueMappingDetail
.
putMappingInfo
(
mapInConfig
.
getMappingDetail
(),
globalId
,
items
);
}
}
//Step4: write to the new leader with logic offset
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录