Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
2f4bf296
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
2f4bf296
编写于
11月 23, 2021
作者:
D
dongeforever
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Polish the code structure for static topic command
上级
3d685903
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
203 addition
and
263 deletion
+203
-263
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
...e/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+97
-4
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
...ketmq/common/statictopic/TopicRemappingDetailWrapper.java
+3
-15
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
...va/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+7
-4
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
...rg/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+61
-1
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
...main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+8
-1
tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
...q/tools/command/topic/RemappingStaticTopicSubCommand.java
+13
-160
tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
...etmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+14
-78
未找到文件。
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
浏览文件 @
2f4bf296
...
...
@@ -19,18 +19,19 @@ package org.apache.rocketmq.common.statictopic;
import
com.google.common.collect.ImmutableList
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.rpc.ClientMetadata
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
import
java.io.File
;
import
java.util.AbstractMap
;
import
java.util.ArrayDeque
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.Collections
;
import
java.util.Comparator
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Queue
;
import
java.util.Random
;
import
java.util.Set
;
import
java.util.stream.Collectors
;
...
...
@@ -311,7 +312,7 @@ public class TopicQueueMappingUtils {
}
}
public
Map
<
String
,
TopicConfigAndQueueMapping
>
createTopicConfigMapping
(
String
topic
,
int
queueNum
,
Set
<
String
>
targetBrokers
,
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
)
{
public
static
TopicRemappingDetailWrapper
createTopicConfigMapping
(
String
topic
,
int
queueNum
,
Set
<
String
>
targetBrokers
,
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
)
{
Map
<
Integer
,
TopicQueueMappingOne
>
globalIdMap
=
new
HashMap
<
Integer
,
TopicQueueMappingOne
>();
Map
.
Entry
<
Long
,
Integer
>
maxEpochAndNum
=
new
AbstractMap
.
SimpleImmutableEntry
<
Long
,
Integer
>(
System
.
currentTimeMillis
(),
queueNum
);
if
(!
brokerConfigMap
.
isEmpty
())
{
...
...
@@ -379,7 +380,99 @@ public class TopicQueueMappingUtils {
TopicQueueMappingUtils
.
checkConsistenceOfTopicConfigAndQueueMapping
(
topic
,
brokerConfigMap
);
TopicQueueMappingUtils
.
checkAndBuildMappingItems
(
new
ArrayList
<
TopicQueueMappingDetail
>(
TopicQueueMappingUtils
.
getMappingDetailFromConfig
(
brokerConfigMap
.
values
())),
false
,
true
);
return
brokerConfigMap
;
return
new
TopicRemappingDetailWrapper
(
topic
,
TopicRemappingDetailWrapper
.
TYPE_CREATE_OR_UPDATE
,
newEpoch
,
brokerConfigMap
,
new
HashSet
<
String
>(),
new
HashSet
<
String
>());
}
public
static
TopicRemappingDetailWrapper
remappingStaticTopic
(
String
topic
,
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
,
Set
<
String
>
targetBrokers
)
{
final
Map
.
Entry
<
Long
,
Integer
>
maxEpochAndNum
=
TopicQueueMappingUtils
.
checkConsistenceOfTopicConfigAndQueueMapping
(
topic
,
brokerConfigMap
);
final
Map
<
Integer
,
TopicQueueMappingOne
>
globalIdMap
=
TopicQueueMappingUtils
.
checkAndBuildMappingItems
(
new
ArrayList
<
TopicQueueMappingDetail
>(
TopicQueueMappingUtils
.
getMappingDetailFromConfig
(
brokerConfigMap
.
values
())),
false
,
true
);
//the check is ok, now do the mapping allocation
int
maxNum
=
maxEpochAndNum
.
getValue
();
Map
<
String
,
Integer
>
brokerNumMap
=
new
HashMap
<
String
,
Integer
>();
for
(
String
broker:
targetBrokers
)
{
brokerNumMap
.
put
(
broker
,
0
);
}
TopicQueueMappingUtils
.
MappingAllocator
allocator
=
TopicQueueMappingUtils
.
buildMappingAllocator
(
new
HashMap
<
Integer
,
String
>(),
brokerNumMap
);
allocator
.
upToNum
(
maxNum
);
Map
<
String
,
Integer
>
expectedBrokerNumMap
=
allocator
.
getBrokerNumMap
();
Queue
<
Integer
>
waitAssignQueues
=
new
ArrayDeque
<
Integer
>();
Map
<
Integer
,
String
>
expectedIdToBroker
=
new
HashMap
<
Integer
,
String
>();
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
for
(
Map
.
Entry
<
Integer
,
TopicQueueMappingOne
>
entry
:
globalIdMap
.
entrySet
())
{
Integer
queueId
=
entry
.
getKey
();
TopicQueueMappingOne
mappingOne
=
entry
.
getValue
();
String
leaderBroker
=
mappingOne
.
getBname
();
if
(
expectedBrokerNumMap
.
containsKey
(
leaderBroker
))
{
if
(
expectedBrokerNumMap
.
get
(
leaderBroker
)
>
0
)
{
expectedIdToBroker
.
put
(
queueId
,
leaderBroker
);
expectedBrokerNumMap
.
put
(
leaderBroker
,
expectedBrokerNumMap
.
get
(
leaderBroker
)
-
1
);
}
else
{
waitAssignQueues
.
add
(
queueId
);
expectedBrokerNumMap
.
remove
(
leaderBroker
);
}
}
else
{
waitAssignQueues
.
add
(
queueId
);
}
}
for
(
Map
.
Entry
<
String
,
Integer
>
entry:
expectedBrokerNumMap
.
entrySet
())
{
String
broker
=
entry
.
getKey
();
Integer
queueNum
=
entry
.
getValue
();
for
(
int
i
=
0
;
i
<
queueNum
;
i
++)
{
Integer
queueId
=
waitAssignQueues
.
poll
();
assert
queueId
!=
null
;
expectedIdToBroker
.
put
(
queueId
,
broker
);
}
}
long
newEpoch
=
Math
.
max
(
maxEpochAndNum
.
getKey
()
+
1000
,
System
.
currentTimeMillis
());
//Now construct the remapping info
Set
<
String
>
brokersToMapOut
=
new
HashSet
<
String
>();
Set
<
String
>
brokersToMapIn
=
new
HashSet
<
String
>();
for
(
Map
.
Entry
<
Integer
,
String
>
mapEntry
:
expectedIdToBroker
.
entrySet
())
{
Integer
queueId
=
mapEntry
.
getKey
();
String
broker
=
mapEntry
.
getValue
();
TopicQueueMappingOne
topicQueueMappingOne
=
globalIdMap
.
get
(
queueId
);
assert
topicQueueMappingOne
!=
null
;
if
(
topicQueueMappingOne
.
getBname
().
equals
(
broker
))
{
continue
;
}
//remapping
final
String
mapInBroker
=
broker
;
final
String
mapOutBroker
=
topicQueueMappingOne
.
getBname
();
brokersToMapIn
.
add
(
mapInBroker
);
brokersToMapOut
.
add
(
mapOutBroker
);
TopicConfigAndQueueMapping
mapInConfig
=
brokerConfigMap
.
get
(
mapInBroker
);
TopicConfigAndQueueMapping
mapOutConfig
=
brokerConfigMap
.
get
(
mapOutBroker
);
mapInConfig
.
setWriteQueueNums
(
mapInConfig
.
getWriteQueueNums
()
+
1
);
mapInConfig
.
setWriteQueueNums
(
mapInConfig
.
getWriteQueueNums
()
+
1
);
List
<
LogicQueueMappingItem
>
items
=
new
ArrayList
<
LogicQueueMappingItem
>(
topicQueueMappingOne
.
getItems
());
LogicQueueMappingItem
last
=
items
.
get
(
items
.
size
()
-
1
);
items
.
add
(
new
LogicQueueMappingItem
(
last
.
getGen
()
+
1
,
mapInConfig
.
getWriteQueueNums
()
-
1
,
mapInBroker
,
-
1
,
0
,
-
1
,
-
1
,
-
1
));
ImmutableList
<
LogicQueueMappingItem
>
resultItems
=
ImmutableList
.
copyOf
(
items
);
//Use the same object
mapInConfig
.
getMappingDetail
().
putMappingInfo
(
queueId
,
resultItems
);
mapOutConfig
.
getMappingDetail
().
putMappingInfo
(
queueId
,
resultItems
);
}
for
(
Map
.
Entry
<
String
,
TopicConfigAndQueueMapping
>
entry
:
brokerConfigMap
.
entrySet
())
{
TopicConfigAndQueueMapping
configMapping
=
entry
.
getValue
();
configMapping
.
getMappingDetail
().
setEpoch
(
newEpoch
);
configMapping
.
getMappingDetail
().
setTotalQueues
(
maxNum
);
}
//double check
TopicQueueMappingUtils
.
checkConsistenceOfTopicConfigAndQueueMapping
(
topic
,
brokerConfigMap
);
TopicQueueMappingUtils
.
checkAndBuildMappingItems
(
new
ArrayList
<
TopicQueueMappingDetail
>(
TopicQueueMappingUtils
.
getMappingDetailFromConfig
(
brokerConfigMap
.
values
())),
false
,
true
);
return
new
TopicRemappingDetailWrapper
(
topic
,
TopicRemappingDetailWrapper
.
TYPE_REMAPPING
,
newEpoch
,
brokerConfigMap
,
brokersToMapIn
,
brokersToMapOut
);
}
}
common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
浏览文件 @
2f4bf296
...
...
@@ -18,7 +18,6 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
private
final
String
topic
;
private
final
String
type
;
private
final
long
epoch
;
private
Map
<
Integer
,
String
>
expectedIdToBroker
=
new
HashMap
<
Integer
,
String
>();
private
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
=
new
HashMap
<
String
,
TopicConfigAndQueueMapping
>();
...
...
@@ -26,12 +25,13 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
private
Set
<
String
>
brokerToMapOut
=
new
HashSet
<
String
>();
public
TopicRemappingDetailWrapper
(
String
topic
,
String
type
,
long
epoch
,
Map
<
Integer
,
String
>
expectedIdToBroker
,
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
)
{
public
TopicRemappingDetailWrapper
(
String
topic
,
String
type
,
long
epoch
,
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
,
Set
<
String
>
brokerToMapIn
,
Set
<
String
>
brokerToMapOut
)
{
this
.
topic
=
topic
;
this
.
type
=
type
;
this
.
epoch
=
epoch
;
this
.
expectedIdToBroker
=
expectedIdToBroker
;
this
.
brokerConfigMap
=
brokerConfigMap
;
this
.
brokerToMapIn
=
brokerToMapIn
;
this
.
brokerToMapOut
=
brokerToMapOut
;
}
public
String
getTopic
()
{
...
...
@@ -46,10 +46,6 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
return
epoch
;
}
public
Map
<
Integer
,
String
>
getExpectedIdToBroker
()
{
return
expectedIdToBroker
;
}
public
Map
<
String
,
TopicConfigAndQueueMapping
>
getBrokerConfigMap
()
{
return
brokerConfigMap
;
}
...
...
@@ -58,15 +54,7 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
return
brokerToMapIn
;
}
public
void
setBrokerToMapIn
(
Set
<
String
>
brokerToMapIn
)
{
this
.
brokerToMapIn
=
brokerToMapIn
;
}
public
Set
<
String
>
getBrokerToMapOut
()
{
return
brokerToMapOut
;
}
public
void
setBrokerToMapOut
(
Set
<
String
>
brokerToMapOut
)
{
this
.
brokerToMapOut
=
brokerToMapOut
;
}
}
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
浏览文件 @
2f4bf296
...
...
@@ -17,7 +17,6 @@
package
org.apache.rocketmq.tools.admin
;
import
java.io.UnsupportedEncodingException
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
...
...
@@ -29,7 +28,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.protocol.route.QueueData
;
import
org.apache.rocketmq.common.rpc.ClientMetadata
;
import
org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping
;
import
org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail
;
...
...
@@ -225,8 +223,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
public
Map
<
String
,
TopicConfigAndQueueMapping
>
getTopicConfigMap
(
ClientMetadata
clientMetadata
,
String
topic
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
return
this
.
defaultMQAdminExtImpl
.
getTopicConfigMap
(
clientMetadata
,
topic
);
public
Map
<
String
,
TopicConfigAndQueueMapping
>
examineTopicConfigAll
(
ClientMetadata
clientMetadata
,
String
topic
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
return
this
.
defaultMQAdminExtImpl
.
examineTopicConfigAll
(
clientMetadata
,
topic
);
}
@Override
...
...
@@ -674,6 +672,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
this
.
defaultMQAdminExtImpl
.
createStaticTopic
(
addr
,
defaultTopic
,
topicConfig
,
mappingDetail
,
force
);
}
@Override
public
void
remappingStaticTopic
(
ClientMetadata
clientMetadata
,
String
topic
,
Set
<
String
>
brokersToMapIn
,
Set
<
String
>
brokersToMapOut
,
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
,
int
blockSeqSize
,
boolean
force
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
this
.
defaultMQAdminExtImpl
.
remappingStaticTopic
(
clientMetadata
,
topic
,
brokersToMapIn
,
brokersToMapOut
,
brokerConfigMap
,
blockSeqSize
,
force
);
}
@Override
public
void
migrateTopicLogicalQueueNotify
(
String
brokerAddr
,
LogicalQueueRouteData
fromQueueRouteData
,
...
...
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
浏览文件 @
2f4bf296
...
...
@@ -1110,8 +1110,68 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
this
.
mqClientInstance
.
getMQAdminImpl
().
createStaticTopic
(
addr
,
defaultTopic
,
topicConfig
,
mappingDetail
,
force
);
}
@Override
public
void
remappingStaticTopic
(
ClientMetadata
clientMetadata
,
String
topic
,
Set
<
String
>
brokersToMapIn
,
Set
<
String
>
brokersToMapOut
,
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
,
int
blockSeqSize
,
boolean
force
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
for
(
String
broker
:
brokerConfigMap
.
keySet
())
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
if
(
addr
==
null
)
{
throw
new
RuntimeException
(
"Can't find addr for broker "
+
broker
);
}
}
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for
(
String
broker:
brokersToMapIn
)
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
TopicConfigAndQueueMapping
configMapping
=
brokerConfigMap
.
get
(
broker
);
createStaticTopic
(
addr
,
defaultMQAdminExt
.
getCreateTopicKey
(),
configMapping
,
configMapping
.
getMappingDetail
(),
force
);
}
//Step2: forbid the write of old leader
for
(
String
broker:
brokersToMapOut
)
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
TopicConfigAndQueueMapping
configMapping
=
brokerConfigMap
.
get
(
broker
);
createStaticTopic
(
addr
,
defaultMQAdminExt
.
getCreateTopicKey
(),
configMapping
,
configMapping
.
getMappingDetail
(),
force
);
}
//Step3: decide the logic offset
for
(
String
broker:
brokersToMapOut
)
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
TopicStatsTable
statsTable
=
examineTopicStats
(
addr
,
topic
);
TopicConfigAndQueueMapping
mapOutConfig
=
brokerConfigMap
.
get
(
broker
);
for
(
Map
.
Entry
<
Integer
,
ImmutableList
<
LogicQueueMappingItem
>>
entry
:
mapOutConfig
.
getMappingDetail
().
getHostedQueues
().
entrySet
())
{
ImmutableList
<
LogicQueueMappingItem
>
items
=
entry
.
getValue
();
Integer
globalId
=
entry
.
getKey
();
if
(
items
.
size
()
<
2
)
{
continue
;
}
LogicQueueMappingItem
newLeader
=
items
.
get
(
items
.
size
()
-
1
);
LogicQueueMappingItem
oldLeader
=
items
.
get
(
items
.
size
()
-
2
);
if
(
newLeader
.
getLogicOffset
()
>
0
)
{
continue
;
}
TopicOffset
topicOffset
=
statsTable
.
getOffsetTable
().
get
(
new
MessageQueue
(
topic
,
oldLeader
.
getBname
(),
oldLeader
.
getQueueId
()));
if
(
topicOffset
==
null
)
{
throw
new
RuntimeException
(
"Cannot get the max offset for old leader "
+
oldLeader
);
}
//TODO check the max offset, will it return -1?
if
(
topicOffset
.
getMaxOffset
()
<
oldLeader
.
getStartOffset
())
{
throw
new
RuntimeException
(
"The max offset is smaller then the start offset "
+
oldLeader
+
" "
+
topicOffset
.
getMaxOffset
());
}
newLeader
.
setLogicOffset
(
TopicQueueMappingUtils
.
blockSeqRoundUp
(
oldLeader
.
computeStaticQueueOffset
(
topicOffset
.
getMaxOffset
()),
blockSeqSize
));
TopicConfigAndQueueMapping
mapInConfig
=
brokerConfigMap
.
get
(
newLeader
.
getBname
());
//fresh the new leader
mapInConfig
.
getMappingDetail
().
putMappingInfo
(
globalId
,
items
);
}
}
//Step4: write to the new leader with logic offset
for
(
String
broker:
brokersToMapIn
)
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
TopicConfigAndQueueMapping
configMapping
=
brokerConfigMap
.
get
(
broker
);
createStaticTopic
(
addr
,
defaultMQAdminExt
.
getCreateTopicKey
(),
configMapping
,
configMapping
.
getMappingDetail
(),
force
);
}
}
@Override
public
Map
<
String
,
TopicConfigAndQueueMapping
>
getTopicConfigMap
(
ClientMetadata
clientMetadata
,
String
topic
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
public
Map
<
String
,
TopicConfigAndQueueMapping
>
examineTopicConfigAll
(
ClientMetadata
clientMetadata
,
String
topic
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
TopicRouteData
routeData
=
examineTopicRouteInfo
(
topic
);
clientMetadata
.
freshTopicRoute
(
topic
,
routeData
);
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
=
new
HashMap
<>();
...
...
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
浏览文件 @
2f4bf296
...
...
@@ -21,13 +21,17 @@ import java.util.List;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Set
;
import
com.google.common.collect.ImmutableList
;
import
org.apache.rocketmq.client.MQAdmin
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.admin.TopicOffset
;
import
org.apache.rocketmq.common.rpc.ClientMetadata
;
import
org.apache.rocketmq.common.statictopic.LogicQueueMappingItem
;
import
org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping
;
import
org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail
;
import
org.apache.rocketmq.common.admin.ConsumeStats
;
...
...
@@ -56,6 +60,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo
;
import
org.apache.rocketmq.common.protocol.route.MessageQueueRouteState
;
import
org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils
;
import
org.apache.rocketmq.common.subscription.SubscriptionGroupConfig
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.exception.RemotingConnectException
;
...
...
@@ -342,9 +347,11 @@ public interface MQAdminExt extends MQAdmin {
void
createStaticTopic
(
final
String
addr
,
final
String
defaultTopic
,
final
TopicConfig
topicConfig
,
final
TopicQueueMappingDetail
mappingDetail
,
final
boolean
force
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
Map
<
String
,
TopicConfigAndQueueMapping
>
getTopicConfigMap
(
ClientMetadata
clientMetadata
,
String
topic
)
throws
RemotingException
,
MQBrokerException
,
Map
<
String
,
TopicConfigAndQueueMapping
>
examineTopicConfigAll
(
ClientMetadata
clientMetadata
,
String
topic
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
void
remappingStaticTopic
(
ClientMetadata
clientMetadata
,
String
topic
,
Set
<
String
>
brokersToMapIn
,
Set
<
String
>
brokersToMapOut
,
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
,
int
blockSeqSize
,
boolean
force
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
void
migrateTopicLogicalQueueNotify
(
String
brokerAddr
,
LogicalQueueRouteData
fromQueueRouteData
,
LogicalQueueRouteData
toQueueRouteData
)
throws
InterruptedException
,
RemotingConnectException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
MQBrokerException
;
}
tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
浏览文件 @
2f4bf296
...
...
@@ -40,6 +40,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import
org.apache.rocketmq.tools.command.SubCommand
;
import
org.apache.rocketmq.tools.command.SubCommandException
;
import
java.util.AbstractMap
;
import
java.util.ArrayDeque
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
...
...
@@ -121,7 +122,13 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
if
(
commandLine
.
hasOption
(
"fr"
)
&&
Boolean
.
parseBoolean
(
commandLine
.
getOptionValue
(
"fr"
).
trim
()))
{
force
=
true
;
}
doRemapping
(
topic
,
wrapper
.
getBrokerToMapIn
(),
wrapper
.
getBrokerToMapOut
(),
wrapper
.
getBrokerConfigMap
(),
clientMetadata
,
defaultMQAdminExt
,
force
);
for
(
String
broker
:
wrapper
.
getBrokerConfigMap
().
keySet
())
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
if
(
addr
==
null
)
{
throw
new
RuntimeException
(
"Can't find addr for broker "
+
broker
);
}
}
defaultMQAdminExt
.
remappingStaticTopic
(
clientMetadata
,
topic
,
wrapper
.
getBrokerToMapIn
(),
wrapper
.
getBrokerToMapOut
(),
wrapper
.
getBrokerConfigMap
(),
10000
,
force
);
return
;
}
catch
(
Exception
e
)
{
throw
new
SubCommandException
(
this
.
getClass
().
getSimpleName
()
+
" command failed"
,
e
);
...
...
@@ -131,68 +138,9 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
}
public
void
doRemapping
(
String
topic
,
Set
<
String
>
brokersToMapIn
,
Set
<
String
>
brokersToMapOut
,
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
,
ClientMetadata
clientMetadata
,
DefaultMQAdminExt
defaultMQAdminExt
,
boolean
force
)
throws
Exception
{
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for
(
String
broker:
brokersToMapIn
)
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
TopicConfigAndQueueMapping
configMapping
=
brokerConfigMap
.
get
(
broker
);
defaultMQAdminExt
.
createStaticTopic
(
addr
,
defaultMQAdminExt
.
getCreateTopicKey
(),
configMapping
,
configMapping
.
getMappingDetail
(),
force
);
}
//Step2: forbid the write of old leader
for
(
String
broker:
brokersToMapOut
)
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
TopicConfigAndQueueMapping
configMapping
=
brokerConfigMap
.
get
(
broker
);
defaultMQAdminExt
.
createStaticTopic
(
addr
,
defaultMQAdminExt
.
getCreateTopicKey
(),
configMapping
,
configMapping
.
getMappingDetail
(),
force
);
}
//Step3: decide the logic offset
for
(
String
broker:
brokersToMapOut
)
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
TopicStatsTable
statsTable
=
defaultMQAdminExt
.
examineTopicStats
(
addr
,
topic
);
TopicConfigAndQueueMapping
mapOutConfig
=
brokerConfigMap
.
get
(
broker
);
for
(
Map
.
Entry
<
Integer
,
ImmutableList
<
LogicQueueMappingItem
>>
entry
:
mapOutConfig
.
getMappingDetail
().
getHostedQueues
().
entrySet
())
{
ImmutableList
<
LogicQueueMappingItem
>
items
=
entry
.
getValue
();
Integer
globalId
=
entry
.
getKey
();
if
(
items
.
size
()
<
2
)
{
continue
;
}
LogicQueueMappingItem
newLeader
=
items
.
get
(
items
.
size
()
-
1
);
LogicQueueMappingItem
oldLeader
=
items
.
get
(
items
.
size
()
-
2
);
if
(
newLeader
.
getLogicOffset
()
>
0
)
{
continue
;
}
TopicOffset
topicOffset
=
statsTable
.
getOffsetTable
().
get
(
new
MessageQueue
(
topic
,
oldLeader
.
getBname
(),
oldLeader
.
getQueueId
()));
if
(
topicOffset
==
null
)
{
throw
new
RuntimeException
(
"Cannot get the max offset for old leader "
+
oldLeader
);
}
//TODO check the max offset, will it return -1?
if
(
topicOffset
.
getMaxOffset
()
<
oldLeader
.
getStartOffset
())
{
throw
new
RuntimeException
(
"The max offset is smaller then the start offset "
+
oldLeader
+
" "
+
topicOffset
.
getMaxOffset
());
}
newLeader
.
setLogicOffset
(
TopicQueueMappingUtils
.
blockSeqRoundUp
(
oldLeader
.
computeStaticQueueOffset
(
topicOffset
.
getMaxOffset
()),
10000
));
TopicConfigAndQueueMapping
mapInConfig
=
brokerConfigMap
.
get
(
newLeader
.
getBname
());
//fresh the new leader
mapInConfig
.
getMappingDetail
().
putMappingInfo
(
globalId
,
items
);
}
}
//Step4: write to the new leader with logic offset
for
(
String
broker:
brokersToMapIn
)
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
TopicConfigAndQueueMapping
configMapping
=
brokerConfigMap
.
get
(
broker
);
defaultMQAdminExt
.
createStaticTopic
(
addr
,
defaultMQAdminExt
.
getCreateTopicKey
(),
configMapping
,
configMapping
.
getMappingDetail
(),
force
);
}
}
@Override
public
void
execute
(
final
CommandLine
commandLine
,
final
Options
options
,
RPCHook
rpcHook
)
throws
SubCommandException
{
if
(!
commandLine
.
hasOption
(
't'
))
{
ServerUtil
.
printCommandLineHelp
(
"mqadmin "
+
this
.
commandName
(),
options
);
return
;
...
...
@@ -250,120 +198,25 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
}
}
//get the existed topic config and mapping
{
TopicRouteData
routeData
=
defaultMQAdminExt
.
examineTopicRouteInfo
(
topic
);
clientMetadata
.
freshTopicRoute
(
topic
,
routeData
);
if
(
routeData
!=
null
&&
!
routeData
.
getQueueDatas
().
isEmpty
())
{
for
(
QueueData
queueData:
routeData
.
getQueueDatas
())
{
String
bname
=
queueData
.
getBrokerName
();
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
bname
);
TopicConfigAndQueueMapping
mapping
=
(
TopicConfigAndQueueMapping
)
defaultMQAdminExt
.
examineTopicConfig
(
addr
,
topic
);
//allow the config is null
if
(
mapping
!=
null
)
{
brokerConfigMap
.
put
(
bname
,
mapping
);
}
}
}
}
brokerConfigMap
=
defaultMQAdminExt
.
examineTopicConfigAll
(
clientMetadata
,
topic
);
if
(
brokerConfigMap
.
isEmpty
())
{
throw
new
RuntimeException
(
"No topic route to do the remapping"
);
}
final
Map
.
Entry
<
Long
,
Integer
>
maxEpochAndNum
=
TopicQueueMappingUtils
.
checkConsistenceOfTopicConfigAndQueueMapping
(
topic
,
brokerConfigMap
);
globalIdMap
=
TopicQueueMappingUtils
.
checkAndBuildMappingItems
(
new
ArrayList
<>(
TopicQueueMappingUtils
.
getMappingDetailFromConfig
(
brokerConfigMap
.
values
())),
false
,
true
);
//the check is ok, now do the mapping allocation
int
maxNum
=
maxEpochAndNum
.
getValue
();
long
maxEpoch
=
maxEpochAndNum
.
getKey
();
Map
.
Entry
<
Long
,
Integer
>
maxEpochAndNum
=
TopicQueueMappingUtils
.
checkConsistenceOfTopicConfigAndQueueMapping
(
topic
,
brokerConfigMap
);
{
TopicRemappingDetailWrapper
oldWrapper
=
new
TopicRemappingDetailWrapper
(
topic
,
TopicRemappingDetailWrapper
.
TYPE_
REMAPPING
,
maxEpoch
,
new
HashMap
<>(),
brokerConfigMap
);
TopicRemappingDetailWrapper
oldWrapper
=
new
TopicRemappingDetailWrapper
(
topic
,
TopicRemappingDetailWrapper
.
TYPE_
CREATE_OR_UPDATE
,
maxEpochAndNum
.
getKey
(),
brokerConfigMap
,
new
HashSet
<
String
>(),
new
HashSet
<
String
>()
);
String
oldMappingDataFile
=
TopicQueueMappingUtils
.
writeToTemp
(
oldWrapper
,
false
);
System
.
out
.
println
(
"The old mapping data is written to file "
+
oldMappingDataFile
);
}
TopicQueueMappingUtils
.
MappingAllocator
allocator
=
TopicQueueMappingUtils
.
buildMappingAllocator
(
new
HashMap
<>(),
targetBrokers
.
stream
().
collect
(
Collectors
.
toMap
(
x
->
x
,
x
->
0
)));
allocator
.
upToNum
(
maxNum
);
Map
<
String
,
Integer
>
expectedBrokerNumMap
=
allocator
.
getBrokerNumMap
();
Queue
<
Integer
>
waitAssignQueues
=
new
ArrayDeque
<
Integer
>();
Map
<
Integer
,
String
>
expectedIdToBroker
=
new
HashMap
<>();
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
globalIdMap
.
forEach
((
queueId
,
mappingOne
)
->
{
String
leaderBroker
=
mappingOne
.
getBname
();
if
(
expectedBrokerNumMap
.
containsKey
(
leaderBroker
))
{
if
(
expectedBrokerNumMap
.
get
(
leaderBroker
)
>
0
)
{
expectedIdToBroker
.
put
(
queueId
,
leaderBroker
);
expectedBrokerNumMap
.
put
(
leaderBroker
,
expectedBrokerNumMap
.
get
(
leaderBroker
)
-
1
);
}
else
{
waitAssignQueues
.
add
(
queueId
);
expectedBrokerNumMap
.
remove
(
leaderBroker
);
}
}
else
{
waitAssignQueues
.
add
(
queueId
);
}
});
expectedBrokerNumMap
.
forEach
((
broker
,
queueNum
)
->
{
for
(
int
i
=
0
;
i
<
queueNum
;
i
++)
{
Integer
queueId
=
waitAssignQueues
.
poll
();
assert
queueId
!=
null
;
expectedIdToBroker
.
put
(
queueId
,
broker
);
}
});
long
newEpoch
=
Math
.
max
(
maxEpochAndNum
.
getKey
()
+
1000
,
System
.
currentTimeMillis
());
//Now construct the remapping info
Set
<
String
>
brokersToMapOut
=
new
HashSet
<>();
Set
<
String
>
brokersToMapIn
=
new
HashSet
<>();
for
(
Map
.
Entry
<
Integer
,
String
>
mapEntry
:
expectedIdToBroker
.
entrySet
())
{
Integer
queueId
=
mapEntry
.
getKey
();
String
broker
=
mapEntry
.
getValue
();
TopicQueueMappingOne
topicQueueMappingOne
=
globalIdMap
.
get
(
queueId
);
assert
topicQueueMappingOne
!=
null
;
if
(
topicQueueMappingOne
.
getBname
().
equals
(
broker
))
{
continue
;
}
//remapping
String
mapInBroker
=
broker
;
String
mapOutBroker
=
topicQueueMappingOne
.
getBname
();
brokersToMapIn
.
add
(
mapInBroker
);
brokersToMapOut
.
add
(
mapOutBroker
);
TopicConfigAndQueueMapping
mapInConfig
=
brokerConfigMap
.
get
(
mapInBroker
);
TopicConfigAndQueueMapping
mapOutConfig
=
brokerConfigMap
.
get
(
mapOutBroker
);
mapInConfig
.
setWriteQueueNums
(
mapInConfig
.
getWriteQueueNums
()
+
1
);
mapInConfig
.
setWriteQueueNums
(
mapInConfig
.
getWriteQueueNums
()
+
1
);
List
<
LogicQueueMappingItem
>
items
=
new
ArrayList
<>(
topicQueueMappingOne
.
getItems
());
LogicQueueMappingItem
last
=
items
.
get
(
items
.
size
()
-
1
);
items
.
add
(
new
LogicQueueMappingItem
(
last
.
getGen
()
+
1
,
mapInConfig
.
getWriteQueueNums
()
-
1
,
mapInBroker
,
-
1
,
0
,
-
1
,
-
1
,
-
1
));
ImmutableList
<
LogicQueueMappingItem
>
resultItems
=
ImmutableList
.
copyOf
(
items
);
//Use the same object
mapInConfig
.
getMappingDetail
().
putMappingInfo
(
queueId
,
resultItems
);
mapOutConfig
.
getMappingDetail
().
putMappingInfo
(
queueId
,
resultItems
);
}
brokerConfigMap
.
values
().
forEach
(
configMapping
->
{
configMapping
.
getMappingDetail
().
setEpoch
(
newEpoch
);
configMapping
.
getMappingDetail
().
setTotalQueues
(
maxNum
);
});
//double check
TopicQueueMappingUtils
.
checkConsistenceOfTopicConfigAndQueueMapping
(
topic
,
brokerConfigMap
);
TopicQueueMappingUtils
.
checkAndBuildMappingItems
(
new
ArrayList
<>(
brokerConfigMap
.
values
().
stream
().
map
(
TopicConfigAndQueueMapping:
:
getMappingDetail
).
collect
(
Collectors
.
toList
())),
false
,
true
);
TopicRemappingDetailWrapper
newWrapper
=
TopicQueueMappingUtils
.
remappingStaticTopic
(
topic
,
brokerConfigMap
,
targetBrokers
);
{
TopicRemappingDetailWrapper
newWrapper
=
new
TopicRemappingDetailWrapper
(
topic
,
TopicRemappingDetailWrapper
.
TYPE_REMAPPING
,
newEpoch
,
expectedIdToBroker
,
brokerConfigMap
);
newWrapper
.
setBrokerToMapIn
(
brokersToMapIn
);
newWrapper
.
setBrokerToMapOut
(
brokersToMapOut
);
String
newMappingDataFile
=
TopicQueueMappingUtils
.
writeToTemp
(
newWrapper
,
true
);
System
.
out
.
println
(
"The old mapping data is written to file "
+
newMappingDataFile
);
}
d
oRemapping
(
topic
,
brokersToMapIn
,
brokersToMapOut
,
brokerConfigMap
,
clientMetadata
,
defaultMQAdminExt
,
false
);
d
efaultMQAdminExt
.
remappingStaticTopic
(
clientMetadata
,
topic
,
newWrapper
.
getBrokerToMapIn
(),
newWrapper
.
getBrokerToMapOut
(),
newWrapper
.
getBrokerConfigMap
(),
10000
,
false
);
}
catch
(
Exception
e
)
{
throw
new
SubCommandException
(
this
.
getClass
().
getSimpleName
()
+
" command failed"
,
e
);
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
浏览文件 @
2f4bf296
...
...
@@ -120,6 +120,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw
new
RuntimeException
(
"The Cluster info is empty"
);
}
clientMetadata
.
refreshClusterInfo
(
clusterInfo
);
doUpdate
(
wrapper
.
getBrokerConfigMap
(),
clientMetadata
,
defaultMQAdminExt
,
force
);
return
;
}
catch
(
Exception
e
)
{
...
...
@@ -130,6 +131,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
public
void
doUpdate
(
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
,
ClientMetadata
clientMetadata
,
DefaultMQAdminExt
defaultMQAdminExt
,
boolean
force
)
throws
Exception
{
//check it before
for
(
String
broker
:
brokerConfigMap
.
keySet
())
{
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
broker
);
if
(
addr
==
null
)
{
throw
new
RuntimeException
(
"Can't find addr for broker "
+
broker
);
}
}
//If some succeed, and others fail, it will cause inconsistent data
for
(
Map
.
Entry
<
String
,
TopicConfigAndQueueMapping
>
entry
:
brokerConfigMap
.
entrySet
())
{
String
broker
=
entry
.
getKey
();
...
...
@@ -158,7 +166,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
ClientMetadata
clientMetadata
=
new
ClientMetadata
();
Map
<
String
,
TopicConfigAndQueueMapping
>
brokerConfigMap
=
new
HashMap
<>();
Map
<
Integer
,
TopicQueueMappingOne
>
globalIdMap
=
new
HashMap
<>();
Set
<
String
>
targetBrokers
=
new
HashSet
<>();
try
{
...
...
@@ -202,101 +209,30 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
//get the existed topic config and mapping
brokerConfigMap
=
defaultMQAdminExt
.
examineTopicConfigAll
(
clientMetadata
,
topic
);
int
queueNum
=
Integer
.
parseInt
(
commandLine
.
getOptionValue
(
"qn"
).
trim
());
{
TopicRouteData
routeData
=
defaultMQAdminExt
.
examineTopicRouteInfo
(
topic
);
clientMetadata
.
freshTopicRoute
(
topic
,
routeData
);
if
(
routeData
!=
null
&&
!
routeData
.
getQueueDatas
().
isEmpty
())
{
for
(
QueueData
queueData:
routeData
.
getQueueDatas
())
{
String
bname
=
queueData
.
getBrokerName
();
String
addr
=
clientMetadata
.
findMasterBrokerAddr
(
bname
);
TopicConfigAndQueueMapping
mapping
=
(
TopicConfigAndQueueMapping
)
defaultMQAdminExt
.
examineTopicConfig
(
addr
,
topic
);
//allow the config is null
if
(
mapping
!=
null
)
{
brokerConfigMap
.
put
(
bname
,
mapping
);
}
}
}
}
Map
.
Entry
<
Long
,
Integer
>
maxEpochAndNum
=
new
AbstractMap
.
SimpleImmutableEntry
<>(
System
.
currentTimeMillis
(),
queueNum
);
if
(!
brokerConfigMap
.
isEmpty
())
{
maxEpochAndNum
=
TopicQueueMappingUtils
.
checkConsistenceOfTopicConfigAndQueueMapping
(
topic
,
brokerConfigMap
);
globalIdMap
=
TopicQueueMappingUtils
.
checkAndBuildMappingItems
(
new
ArrayList
<>(
TopicQueueMappingUtils
.
getMappingDetailFromConfig
(
brokerConfigMap
.
values
())),
false
,
true
);
}
if
(
queueNum
<
globalIdMap
.
size
())
{
throw
new
RuntimeException
(
String
.
format
(
"Cannot decrease the queue num for static topic %d < %d"
,
queueNum
,
globalIdMap
.
size
()));
}
//check the queue number
if
(
queueNum
==
globalIdMap
.
size
())
{
throw
new
RuntimeException
(
"The topic queue num is equal the existed queue num, do nothing"
);
}
{
TopicRemappingDetailWrapper
oldWrapper
=
new
TopicRemappingDetailWrapper
(
topic
,
TopicRemappingDetailWrapper
.
TYPE_CREATE_OR_UPDATE
,
maxEpochAndNum
.
getKey
(),
new
HashMap
<>(),
brokerConfigMap
);
TopicRemappingDetailWrapper
oldWrapper
=
new
TopicRemappingDetailWrapper
(
topic
,
TopicRemappingDetailWrapper
.
TYPE_CREATE_OR_UPDATE
,
maxEpochAndNum
.
getKey
(),
brokerConfigMap
,
new
HashSet
<
String
>(),
new
HashSet
<
String
>()
);
String
oldMappingDataFile
=
TopicQueueMappingUtils
.
writeToTemp
(
oldWrapper
,
false
);
System
.
out
.
println
(
"The old mapping data is written to file "
+
oldMappingDataFile
);
}
//the check is ok, now do the mapping allocation
Map
<
String
,
Integer
>
brokerNumMap
=
targetBrokers
.
stream
().
collect
(
Collectors
.
toMap
(
x
->
x
,
x
->
0
));
final
Map
<
Integer
,
String
>
oldIdToBroker
=
new
HashMap
<>();
globalIdMap
.
forEach
((
key
,
value
)
->
{
String
leaderbroker
=
value
.
getBname
();
oldIdToBroker
.
put
(
key
,
leaderbroker
);
if
(!
brokerNumMap
.
containsKey
(
leaderbroker
))
{
brokerNumMap
.
put
(
leaderbroker
,
1
);
}
else
{
brokerNumMap
.
put
(
leaderbroker
,
brokerNumMap
.
get
(
leaderbroker
)
+
1
);
}
});
TopicQueueMappingUtils
.
MappingAllocator
allocator
=
TopicQueueMappingUtils
.
buildMappingAllocator
(
oldIdToBroker
,
brokerNumMap
);
allocator
.
upToNum
(
queueNum
);
Map
<
Integer
,
String
>
newIdToBroker
=
allocator
.
getIdToBroker
();
//construct the topic configAndMapping
long
newEpoch
=
Math
.
max
(
maxEpochAndNum
.
getKey
()
+
1000
,
System
.
currentTimeMillis
());
for
(
Map
.
Entry
<
Integer
,
String
>
e
:
newIdToBroker
.
entrySet
())
{
Integer
queueId
=
e
.
getKey
();
String
broker
=
e
.
getValue
();
if
(
globalIdMap
.
containsKey
(
queueId
))
{
//ignore the exited
continue
;
}
TopicConfigAndQueueMapping
configMapping
;
if
(!
brokerConfigMap
.
containsKey
(
broker
))
{
configMapping
=
new
TopicConfigAndQueueMapping
(
new
TopicConfig
(
topic
),
new
TopicQueueMappingDetail
(
topic
,
0
,
broker
,
-
1
));
configMapping
.
setWriteQueueNums
(
1
);
configMapping
.
setReadQueueNums
(
1
);
brokerConfigMap
.
put
(
broker
,
configMapping
);
}
else
{
configMapping
=
brokerConfigMap
.
get
(
broker
);
configMapping
.
setWriteQueueNums
(
configMapping
.
getWriteQueueNums
()
+
1
);
configMapping
.
setReadQueueNums
(
configMapping
.
getReadQueueNums
()
+
1
);
}
LogicQueueMappingItem
mappingItem
=
new
LogicQueueMappingItem
(
0
,
configMapping
.
getWriteQueueNums
()
-
1
,
broker
,
0
,
0
,
-
1
,
-
1
,
-
1
);
configMapping
.
getMappingDetail
().
putMappingInfo
(
queueId
,
ImmutableList
.
of
(
mappingItem
));
}
// set the topic config
brokerConfigMap
.
values
().
forEach
(
configMapping
->
{
configMapping
.
getMappingDetail
().
setEpoch
(
newEpoch
);
configMapping
.
getMappingDetail
().
setTotalQueues
(
queueNum
);
});
//double check the config
TopicQueueMappingUtils
.
checkConsistenceOfTopicConfigAndQueueMapping
(
topic
,
brokerConfigMap
);
TopicQueueMappingUtils
.
checkAndBuildMappingItems
(
new
ArrayList
<>(
TopicQueueMappingUtils
.
getMappingDetailFromConfig
(
brokerConfigMap
.
values
())),
false
,
true
);
//calculate the new data
TopicRemappingDetailWrapper
newWrapper
=
TopicQueueMappingUtils
.
createTopicConfigMapping
(
topic
,
queueNum
,
targetBrokers
,
brokerConfigMap
);
{
TopicRemappingDetailWrapper
newWrapper
=
new
TopicRemappingDetailWrapper
(
topic
,
TopicRemappingDetailWrapper
.
TYPE_CREATE_OR_UPDATE
,
newEpoch
,
newIdToBroker
,
brokerConfigMap
);
String
newMappingDataFile
=
TopicQueueMappingUtils
.
writeToTemp
(
newWrapper
,
true
);
System
.
out
.
println
(
"The new mapping data is written to file "
+
newMappingDataFile
);
}
doUpdate
(
brokerConfigMap
,
clientMetadata
,
defaultMQAdminExt
,
false
);
doUpdate
(
newWrapper
.
getBrokerConfigMap
()
,
clientMetadata
,
defaultMQAdminExt
,
false
);
}
catch
(
Exception
e
)
{
throw
new
SubCommandException
(
this
.
getClass
().
getSimpleName
()
+
" command failed"
,
e
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录