Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
0846c3c6
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
0846c3c6
编写于
6月 23, 2021
作者:
C
chenzlalvin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[RIP-21] submodule test
上级
f3990857
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
1246 addition
and
32 deletion
+1246
-32
test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
.../src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
+5
-4
test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
...src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+69
-26
test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
...va/org/apache/rocketmq/test/base/IntegrationTestBase.java
+2
-2
test/src/test/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java
...t/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java
+1170
-0
未找到文件。
test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
浏览文件 @
0846c3c6
...
...
@@ -20,6 +20,7 @@ package org.apache.rocketmq.test.util;
import
java.util.HashMap
;
import
java.util.Set
;
import
java.util.UUID
;
import
java.util.concurrent.ForkJoinPool
;
import
org.apache.log4j.Logger
;
import
org.apache.rocketmq.common.admin.TopicStatsTable
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
...
...
@@ -60,7 +61,7 @@ public class MQAdmin {
}
}
mqAdminExt
.
shutdown
(
);
ForkJoinPool
.
commonPool
().
execute
(
mqAdminExt:
:
shutdown
);
return
createResult
;
}
...
...
@@ -99,7 +100,7 @@ public class MQAdmin {
createResult
=
false
;
e
.
printStackTrace
();
}
mqAdminExt
.
shutdown
(
);
ForkJoinPool
.
commonPool
().
execute
(
mqAdminExt:
:
shutdown
);
return
createResult
;
}
...
...
@@ -113,7 +114,7 @@ public class MQAdmin {
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
mqAdminExt
.
shutdown
(
);
ForkJoinPool
.
commonPool
().
execute
(
mqAdminExt:
:
shutdown
);
return
clusterInfo
;
}
...
...
@@ -159,7 +160,7 @@ public class MQAdmin {
createResult
=
false
;
e
.
printStackTrace
();
}
mqAdminExt
.
shutdown
(
);
ForkJoinPool
.
commonPool
().
execute
(
mqAdminExt:
:
shutdown
);
}
}
test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
浏览文件 @
0846c3c6
...
...
@@ -17,13 +17,23 @@
package
org.apache.rocketmq.test.base
;
import
com.google.common.collect.ImmutableList
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.ForkJoinPool
;
import
java.util.concurrent.TimeUnit
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
import
org.apache.log4j.Logger
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.client.consumer.MQPullConsumer
;
import
org.apache.rocketmq.client.consumer.MQPushConsumer
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.producer.MQProducer
;
import
org.apache.rocketmq.client.producer.TransactionListener
;
import
org.apache.rocketmq.common.MQVersion
;
import
org.apache.rocketmq.common.protocol.route.BrokerData
;
import
org.apache.rocketmq.namesrv.NamesrvController
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer
;
...
...
@@ -36,21 +46,28 @@ import org.apache.rocketmq.test.factory.ConsumerFactory;
import
org.apache.rocketmq.test.listener.AbstractListener
;
import
org.apache.rocketmq.test.util.MQAdmin
;
import
org.apache.rocketmq.test.util.MQRandomUtils
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExt
;
import
org.apache.rocketmq.tools.admin.MQAdminExt
;
import
static
org
.
awaitility
.
Awaitility
.
await
;
public
class
BaseConf
{
public
static
String
nsAddr
;
protected
static
String
broker1Name
;
protected
static
String
broker2Name
;
protected
static
String
clusterName
;
protected
static
int
brokerNum
;
protected
static
int
waitTime
=
5
;
protected
static
int
consumeTime
=
2
*
60
*
1000
;
protected
static
NamesrvController
namesrvController
;
protected
static
BrokerController
brokerController1
;
protected
static
BrokerController
brokerController2
;
protected
static
List
<
Object
>
mqClients
=
new
ArrayList
<
Object
>();
protected
static
boolean
debug
=
false
;
private
static
Logger
log
=
Logger
.
getLogger
(
BaseConf
.
class
);
public
final
static
String
nsAddr
;
protected
final
static
String
broker1Name
;
protected
final
static
String
broker2Name
;
protected
final
static
String
clusterName
;
protected
final
static
int
brokerNum
;
protected
final
static
int
waitTime
=
5
;
protected
final
static
int
consumeTime
=
2
*
60
*
1000
;
protected
final
static
int
QUEUE_NUMBERS
=
8
;
protected
final
static
NamesrvController
namesrvController
;
protected
final
static
BrokerController
brokerController1
;
protected
final
static
BrokerController
brokerController2
;
protected
final
static
List
<
BrokerController
>
brokerControllerList
;
protected
final
static
Map
<
String
,
BrokerController
>
brokerControllerMap
;
protected
final
static
List
<
Object
>
mqClients
=
new
ArrayList
<
Object
>();
protected
final
static
boolean
debug
=
false
;
private
final
static
Logger
log
=
Logger
.
getLogger
(
BaseConf
.
class
);
static
{
System
.
setProperty
(
RemotingCommand
.
REMOTING_VERSION_KEY
,
Integer
.
toString
(
MQVersion
.
CURRENT_VERSION
));
...
...
@@ -62,14 +79,32 @@ public class BaseConf {
broker1Name
=
brokerController1
.
getBrokerConfig
().
getBrokerName
();
broker2Name
=
brokerController2
.
getBrokerConfig
().
getBrokerName
();
brokerNum
=
2
;
brokerControllerList
=
ImmutableList
.
of
(
brokerController1
,
brokerController2
);
brokerControllerMap
=
brokerControllerList
.
stream
().
collect
(
Collectors
.
toMap
(
input
->
input
.
getBrokerConfig
().
getBrokerName
(),
Function
.
identity
()));
}
public
BaseConf
()
{
}
// This method can't be placed in the static block of BaseConf, which seems to lead to a strange dead lock.
public
static
void
waitBrokerRegistered
(
final
String
nsAddr
,
final
String
clusterName
)
{
final
DefaultMQAdminExt
mqAdminExt
=
new
DefaultMQAdminExt
(
500
);
mqAdminExt
.
setNamesrvAddr
(
nsAddr
);
try
{
mqAdminExt
.
start
();
await
().
atMost
(
30
,
TimeUnit
.
SECONDS
).
until
(()
->
{
List
<
BrokerData
>
brokerDatas
=
mqAdminExt
.
examineTopicRouteInfo
(
clusterName
).
getBrokerDatas
();
return
brokerDatas
.
size
()
==
brokerNum
;
});
}
catch
(
MQClientException
e
)
{
log
.
error
(
"init failed, please check BaseConf"
);
}
ForkJoinPool
.
commonPool
().
execute
(
mqAdminExt:
:
shutdown
);
}
public
static
String
initTopic
()
{
String
topic
=
MQRandomUtils
.
getRandomTopic
();
String
topic
=
"tt-"
+
MQRandomUtils
.
getRandomTopic
();
IntegrationTestBase
.
initTopic
(
topic
,
nsAddr
,
clusterName
);
return
topic
;
...
...
@@ -157,18 +192,26 @@ public class BaseConf {
}
public
static
void
shutdown
()
{
try
{
for
(
Object
mqClient
:
mqClients
)
{
ImmutableList
<
Object
>
mqClients
=
ImmutableList
.
copyOf
(
BaseConf
.
mqClients
);
BaseConf
.
mqClients
.
clear
();
shutdown
(
mqClients
);
}
public
static
void
shutdown
(
List
<
Object
>
mqClients
)
{
mqClients
.
forEach
(
mqClient
->
ForkJoinPool
.
commonPool
().
execute
(()
->
{
if
(
mqClient
instanceof
AbstractMQProducer
)
{
((
AbstractMQProducer
)
mqClient
).
shutdown
();
}
else
{
}
else
if
(
mqClient
instanceof
AbstractMQConsumer
)
{
((
AbstractMQConsumer
)
mqClient
).
shutdown
();
}
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
else
if
(
mqClient
instanceof
MQAdminExt
)
{
((
MQAdminExt
)
mqClient
).
shutdown
();
}
else
if
(
mqClient
instanceof
MQProducer
)
{
((
MQProducer
)
mqClient
).
shutdown
();
}
else
if
(
mqClient
instanceof
MQPullConsumer
)
{
((
MQPullConsumer
)
mqClient
).
shutdown
();
}
else
if
(
mqClient
instanceof
MQPushConsumer
)
{
((
MQPushConsumer
)
mqClient
).
shutdown
();
}
}));
}
}
test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
浏览文件 @
0846c3c6
...
...
@@ -47,7 +47,7 @@ public class IntegrationTestBase {
protected
static
final
List
<
BrokerController
>
BROKER_CONTROLLERS
=
new
ArrayList
<>();
protected
static
final
List
<
NamesrvController
>
NAMESRV_CONTROLLERS
=
new
ArrayList
<>();
protected
static
int
topicCreateTime
=
30
*
1000
;
p
rotected
static
final
int
COMMIT_LOG_SIZE
=
1024
*
1024
*
100
;
p
ublic
static
volatile
int
COMMIT_LOG_SIZE
=
1024
*
1024
*
100
;
protected
static
final
int
INDEX_NUM
=
1000
;
private
static
final
AtomicInteger
port
=
new
AtomicInteger
(
40000
);
...
...
@@ -183,7 +183,7 @@ public class IntegrationTestBase {
}
public
static
boolean
initTopic
(
String
topic
,
String
nsAddr
,
String
clusterName
)
{
return
initTopic
(
topic
,
nsAddr
,
clusterName
,
8
);
return
initTopic
(
topic
,
nsAddr
,
clusterName
,
BaseConf
.
QUEUE_NUMBERS
);
}
public
static
void
deleteFile
(
File
file
)
{
...
...
test/src/test/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java
0 → 100644
浏览文件 @
0846c3c6
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.test.smoke
;
import
com.google.common.collect.ImmutableList
;
import
com.google.common.collect.Maps
;
import
java.nio.charset.StandardCharsets
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Collections
;
import
java.util.Comparator
;
import
java.util.Iterator
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Locale
;
import
java.util.Map
;
import
java.util.Queue
;
import
java.util.Set
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.ForkJoinPool
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
import
java.util.stream.IntStream
;
import
org.apache.commons.lang3.RandomStringUtils
;
import
org.apache.commons.lang3.reflect.FieldUtils
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.client.consumer.DefaultMQPullConsumer
;
import
org.apache.rocketmq.client.consumer.PullCallback
;
import
org.apache.rocketmq.client.consumer.PullResult
;
import
org.apache.rocketmq.client.consumer.PullStatus
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.SendCallback
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.client.producer.SendResultForLogicalQueue
;
import
org.apache.rocketmq.common.MQVersion
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.constant.PermName
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo
;
import
org.apache.rocketmq.common.protocol.route.MessageQueueRouteState
;
import
org.apache.rocketmq.common.protocol.route.TopicRouteData
;
import
org.apache.rocketmq.namesrv.NamesrvController
;
import
org.apache.rocketmq.remoting.exception.RemotingConnectException
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.CommitLog
;
import
org.apache.rocketmq.store.DefaultMessageStore
;
import
org.apache.rocketmq.store.MappedFileQueue
;
import
org.apache.rocketmq.test.base.BaseConf
;
import
org.apache.rocketmq.test.base.IntegrationTestBase
;
import
org.apache.rocketmq.test.util.MQRandomUtils
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExt
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl
;
import
org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand
;
import
org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand
;
import
org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand
;
import
org.junit.AfterClass
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.BeforeClass
;
import
org.junit.FixMethodOrder
;
import
org.junit.Test
;
import
org.junit.runners.MethodSorters
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
static
java
.
util
.
Optional
.
ofNullable
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThatThrownBy
;
import
static
org
.
awaitility
.
Awaitility
.
await
;
import
static
org
.
awaitility
.
Awaitility
.
waitAtMost
;
@FixMethodOrder
(
MethodSorters
.
NAME_ASCENDING
)
public
class
LogicalQueueIT
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
LogicalQueueIT
.
class
);
public
static
String
nsAddr
;
private
static
String
broker1Name
;
private
static
String
broker2Name
;
private
static
String
clusterName
;
private
static
int
brokerNum
;
private
final
static
int
QUEUE_NUMBERS
=
8
;
private
static
NamesrvController
namesrvController
;
private
static
BrokerController
brokerController1
;
private
static
BrokerController
brokerController2
;
private
static
Map
<
String
,
BrokerController
>
brokerControllerMap
;
private
final
static
List
<
Object
>
mqClients
=
new
ArrayList
<>();
private
static
DefaultMQProducer
producer
;
private
static
DefaultMQPullConsumer
consumer
;
private
static
DefaultMQAdminExt
mqAdminExt
;
private
static
volatile
String
topic
=
null
;
private
static
final
String
placeholderTopic
=
"placeholder"
;
private
static
final
int
MSG_SENT_TIMES
=
3
;
private
static
final
int
COMMIT_LOG_FILE_SIZE
=
512
*
1024
;
@BeforeClass
public
static
void
beforeClass
()
throws
Exception
{
System
.
setProperty
(
RemotingCommand
.
REMOTING_VERSION_KEY
,
Integer
.
toString
(
MQVersion
.
CURRENT_VERSION
));
namesrvController
=
IntegrationTestBase
.
createAndStartNamesrv
();
nsAddr
=
"127.0.0.1:"
+
namesrvController
.
getNettyServerConfig
().
getListenPort
();
int
oldCommitLogSize
=
IntegrationTestBase
.
COMMIT_LOG_SIZE
;
IntegrationTestBase
.
COMMIT_LOG_SIZE
=
COMMIT_LOG_FILE_SIZE
;
brokerController1
=
IntegrationTestBase
.
createAndStartBroker
(
nsAddr
);
brokerController2
=
IntegrationTestBase
.
createAndStartBroker
(
nsAddr
);
IntegrationTestBase
.
COMMIT_LOG_SIZE
=
oldCommitLogSize
;
clusterName
=
brokerController1
.
getBrokerConfig
().
getBrokerClusterName
();
broker1Name
=
brokerController1
.
getBrokerConfig
().
getBrokerName
();
broker2Name
=
brokerController2
.
getBrokerConfig
().
getBrokerName
();
brokerNum
=
2
;
brokerControllerMap
=
ImmutableList
.
of
(
brokerController1
,
brokerController2
).
stream
().
collect
(
Collectors
.
toMap
(
input
->
input
.
getBrokerConfig
().
getBrokerName
(),
Function
.
identity
()));
BaseConf
.
waitBrokerRegistered
(
nsAddr
,
clusterName
);
producer
=
new
DefaultMQProducer
(
MQRandomUtils
.
getRandomConsumerGroup
());
mqClients
.
add
(
producer
);
producer
.
setNamesrvAddr
(
nsAddr
);
producer
.
setCompressMsgBodyOverHowmuch
(
Integer
.
MAX_VALUE
);
producer
.
setSendMsgTimeout
(
1000
);
producer
.
start
();
consumer
=
new
DefaultMQPullConsumer
(
BaseConf
.
initConsumerGroup
());
mqClients
.
add
(
consumer
);
consumer
.
setNamesrvAddr
(
nsAddr
);
consumer
.
setConsumerPullTimeoutMillis
(
1000
);
consumer
.
start
();
mqAdminExt
=
new
DefaultMQAdminExt
(
1000
);
mqClients
.
add
(
mqAdminExt
);
mqAdminExt
.
setNamesrvAddr
(
nsAddr
);
mqAdminExt
.
start
();
mqAdminExt
.
createTopic
(
clusterName
,
placeholderTopic
,
1
);
}
@AfterClass
public
static
void
afterClass
()
{
BaseConf
.
shutdown
(
mqClients
);
brokerControllerMap
.
forEach
((
s
,
brokerController
)
->
brokerController
.
shutdown
());
ofNullable
(
namesrvController
).
ifPresent
(
obj
->
ForkJoinPool
.
commonPool
().
execute
(
obj:
:
shutdown
));
}
@Before
public
void
setUp
()
throws
Exception
{
topic
=
"tt-"
+
MQRandomUtils
.
getRandomTopic
();
logger
.
info
(
"use topic: {}"
,
topic
);
mqAdminExt
.
createTopic
(
clusterName
,
topic
,
QUEUE_NUMBERS
);
assertThat
(
mqAdminExt
.
examineTopicRouteInfo
(
topic
).
getBrokerDatas
()).
hasSize
(
brokerNum
);
await
().
atMost
(
5
,
TimeUnit
.
SECONDS
).
until
(()
->
!
mqAdminExt
.
examineTopicStats
(
topic
).
getOffsetTable
().
isEmpty
());
consumer
.
setRegisterTopics
(
Collections
.
singleton
(
topic
));
// consumer.setMessageQueueListener & consumer.registerMessageQueueListener are useless in DefaultMQPullConsumer, they will never work, so do not need to test it
new
UpdateTopicLogicalQueueMappingCommand
().
execute
(
mqAdminExt
,
topic
,
brokerControllerMap
.
values
().
stream
().
map
(
BrokerController:
:
getBrokerAddr
).
collect
(
Collectors
.
toSet
()));
}
private
static
String
getCurrentMethodName
()
{
// 0: getStackTrace
// 1: getCurrentMethodName
// 2: __realMethod__
return
Thread
.
currentThread
().
getStackTrace
()[
2
].
getMethodName
();
}
@Test
public
void
test001_SendPullSync
()
throws
Exception
{
String
methodName
=
getCurrentMethodName
();
List
<
MessageQueue
>
publishMessageQueues
=
producer
.
fetchPublishMessageQueues
(
topic
);
assertThat
(
publishMessageQueues
).
hasSize
(
brokerNum
*
QUEUE_NUMBERS
);
Set
<
Integer
>
queueIds
=
IntStream
.
range
(
0
,
brokerNum
*
QUEUE_NUMBERS
).
boxed
().
collect
(
Collectors
.
toSet
());
for
(
MessageQueue
messageQueue
:
publishMessageQueues
)
{
assertThat
(
messageQueue
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
queueIds
.
remove
(
messageQueue
.
getQueueId
())).
isTrue
();
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-sync-%d-%d"
,
methodName
,
messageQueue
.
getQueueId
(),
i
).
getBytes
(
StandardCharsets
.
UTF_8
)),
messageQueue
);
assertThat
(
sendResult
.
getMessageQueue
().
getBrokerName
()).
isEqualTo
(
messageQueue
.
getBrokerName
());
assertThat
(
sendResult
.
getMessageQueue
().
getQueueId
()).
isEqualTo
(
messageQueue
.
getQueueId
());
}
}
assertThat
(
queueIds
).
isEmpty
();
List
<
MessageQueue
>
subscribeMessageQueues
=
consumer
.
fetchSubscribeMessageQueues
(
topic
).
stream
().
sorted
().
collect
(
Collectors
.
toList
());
assertThat
(
subscribeMessageQueues
).
hasSize
(
brokerNum
*
QUEUE_NUMBERS
);
subscribeMessageQueues
.
sort
(
Comparator
.
comparingInt
(
MessageQueue:
:
getQueueId
));
queueIds
.
addAll
(
IntStream
.
range
(
0
,
brokerNum
*
QUEUE_NUMBERS
).
boxed
().
collect
(
Collectors
.
toSet
()));
for
(
MessageQueue
messageQueue
:
subscribeMessageQueues
)
{
assertThat
(
messageQueue
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
queueIds
.
remove
(
messageQueue
.
getQueueId
())).
isTrue
();
long
offset
=
mqAdminExt
.
minOffset
(
messageQueue
);
PullResult
pullResult
=
consumer
.
pull
(
messageQueue
,
"*"
,
offset
,
10
);
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
FOUND
);
assertThat
(
pullResult
.
getMsgFoundList
()).
hasSize
(
MSG_SENT_TIMES
);
offset
=
-
1
;
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
MessageExt
msg
=
pullResult
.
getMsgFoundList
().
get
(
i
);
assertThat
(
msg
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
msg
.
getQueueId
()).
isEqualTo
(
messageQueue
.
getQueueId
());
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-sync-%d-%d"
,
methodName
,
messageQueue
.
getQueueId
(),
i
));
if
(
i
>
0
)
{
assertThat
(
msg
.
getQueueOffset
()).
isEqualTo
(
offset
+
i
);
}
else
{
offset
=
msg
.
getQueueOffset
();
}
}
assertThat
(
maxOffsetUncommitted
(
messageQueue
)).
isEqualTo
(
offset
+
MSG_SENT_TIMES
);
}
assertThat
(
queueIds
).
isEmpty
();
}
@Test
public
void
test002_SendPullAsync
()
throws
Exception
{
String
methodName
=
getCurrentMethodName
();
List
<
MessageQueue
>
publishMessageQueues
=
producer
.
fetchPublishMessageQueues
(
topic
);
for
(
MessageQueue
messageQueue
:
publishMessageQueues
)
{
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
CompletableFuture
<
SendResult
>
future
=
new
CompletableFuture
<>();
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-async-%d-%d"
,
methodName
,
messageQueue
.
getQueueId
(),
i
).
getBytes
(
StandardCharsets
.
UTF_8
)),
messageQueue
,
new
SendCallback
()
{
@Override
public
void
onSuccess
(
SendResult
sendResult
)
{
future
.
complete
(
sendResult
);
}
@Override
public
void
onException
(
Throwable
e
)
{
future
.
completeExceptionally
(
e
);
}
});
SendResult
sendResult
=
future
.
get
();
assertThat
(
sendResult
.
getMessageQueue
().
getBrokerName
()).
isEqualTo
(
messageQueue
.
getBrokerName
());
assertThat
(
sendResult
.
getMessageQueue
().
getQueueId
()).
isEqualTo
(
messageQueue
.
getQueueId
());
}
}
List
<
MessageQueue
>
subscribeMessageQueues
=
consumer
.
fetchSubscribeMessageQueues
(
topic
).
stream
().
sorted
().
collect
(
Collectors
.
toList
());
for
(
MessageQueue
messageQueue
:
subscribeMessageQueues
)
{
long
offset
=
mqAdminExt
.
minOffset
(
messageQueue
);
CompletableFuture
<
PullResult
>
future
=
new
CompletableFuture
<>();
consumer
.
pull
(
messageQueue
,
"*"
,
offset
,
10
,
new
PullCallback
()
{
@Override
public
void
onSuccess
(
PullResult
pullResult
)
{
future
.
complete
(
pullResult
);
}
@Override
public
void
onException
(
Throwable
e
)
{
future
.
completeExceptionally
(
e
);
}
});
PullResult
pullResult
=
future
.
get
();
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
FOUND
);
assertThat
(
pullResult
.
getMsgFoundList
()).
hasSize
(
MSG_SENT_TIMES
);
offset
=
-
1
;
Iterator
<
MessageExt
>
it
=
pullResult
.
getMsgFoundList
().
iterator
();
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
MessageExt
msg
=
it
.
next
();
assertThat
(
msg
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
msg
.
getQueueId
()).
isEqualTo
(
messageQueue
.
getQueueId
());
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-async-%d-%d"
,
methodName
,
messageQueue
.
getQueueId
(),
i
));
if
(
i
>
0
)
{
assertThat
(
msg
.
getQueueOffset
()).
isEqualTo
(
offset
+
i
);
}
else
{
offset
=
msg
.
getQueueOffset
();
}
}
}
}
@Test
public
void
test003_MigrateOnceWithoutData
()
throws
Exception
{
final
String
methodName
=
getCurrentMethodName
();
final
int
logicalQueueIdx
=
1
;
TopicRouteData
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList1
=
topicRouteInfo
.
getLogicalQueuesInfo
().
get
(
logicalQueueIdx
);
LogicalQueueRouteData
lastLogicalQueueRouteData1
=
logicalQueueRouteDataList1
.
get
(
logicalQueueRouteDataList1
.
size
()
-
1
);
String
newBrokerName
;
if
(
lastLogicalQueueRouteData1
.
getBrokerName
().
equals
(
broker1Name
))
{
newBrokerName
=
broker2Name
;
}
else
{
newBrokerName
=
broker1Name
;
}
MessageQueue
migratedMessageQueue
=
new
MessageQueue
(
topic
,
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
,
logicalQueueIdx
);
new
MigrateTopicLogicalQueueCommand
().
execute
(
mqAdminExt
,
topic
,
logicalQueueIdx
,
newBrokerName
,
null
);
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
assertThat
(
topicRouteInfo
.
getLogicalQueuesInfo
()).
isNotNull
();
for
(
Map
.
Entry
<
Integer
,
List
<
LogicalQueueRouteData
>>
entry
:
topicRouteInfo
.
getLogicalQueuesInfo
().
entrySet
())
{
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList2
=
entry
.
getValue
();
if
(
entry
.
getKey
()
==
logicalQueueIdx
)
{
assertThat
(
logicalQueueRouteDataList2
).
hasSize
(
logicalQueueRouteDataList1
.
size
()
+
1
);
LogicalQueueRouteData
lastLogicalQueueRouteData2
=
logicalQueueRouteDataList2
.
get
(
logicalQueueRouteDataList2
.
size
()
-
2
);
assertThat
(
lastLogicalQueueRouteData2
.
getMessageQueue
()).
isEqualTo
(
lastLogicalQueueRouteData1
.
getMessageQueue
());
assertThat
(
lastLogicalQueueRouteData2
.
getOffsetMax
()).
isGreaterThanOrEqualTo
(
0L
);
assertThat
(
lastLogicalQueueRouteData2
.
getMessagesCount
()).
isEqualTo
(
0L
);
assertThat
(
lastLogicalQueueRouteData2
.
isWritable
()).
isFalse
();
assertThat
(
lastLogicalQueueRouteData2
.
isReadable
()).
isFalse
();
assertThat
(
lastLogicalQueueRouteData2
.
isExpired
()).
isTrue
();
assertThat
(
lastLogicalQueueRouteData2
.
getLogicalQueueDelta
()).
isEqualTo
(
0L
);
LogicalQueueRouteData
lastLogicalQueueRouteData3
=
logicalQueueRouteDataList2
.
get
(
logicalQueueRouteDataList2
.
size
()
-
1
);
assertThat
(
lastLogicalQueueRouteData3
.
getBrokerName
()).
isEqualTo
(
newBrokerName
);
assertThat
(
lastLogicalQueueRouteData3
.
getOffsetMax
()).
isLessThan
(
0L
);
assertThat
(
lastLogicalQueueRouteData3
.
isWritable
()).
isTrue
();
assertThat
(
lastLogicalQueueRouteData3
.
isReadable
()).
isTrue
();
assertThat
(
lastLogicalQueueRouteData3
.
isExpired
()).
isFalse
();
assertThat
(
lastLogicalQueueRouteData3
.
getLogicalQueueDelta
()).
isEqualTo
(
0L
);
}
else
{
assertThat
(
logicalQueueRouteDataList2
).
hasSize
(
1
);
LogicalQueueRouteData
logicalQueueRouteData
=
logicalQueueRouteDataList2
.
get
(
0
);
assertThat
(
logicalQueueRouteData
.
getOffsetMax
()).
isLessThan
(
0L
);
assertThat
(
logicalQueueRouteData
.
isWritable
()).
isTrue
();
assertThat
(
logicalQueueRouteData
.
isReadable
()).
isTrue
();
assertThat
(
logicalQueueRouteData
.
isExpired
()).
isFalse
();
assertThat
(
logicalQueueRouteData
.
getLogicalQueueDelta
()).
isEqualTo
(
0L
);
}
}
List
<
MessageQueue
>
subscribeMessageQueues
=
consumer
.
fetchSubscribeMessageQueues
(
topic
).
stream
().
sorted
().
collect
(
Collectors
.
toList
());
assertThat
(
subscribeMessageQueues
).
hasSize
(
brokerNum
*
QUEUE_NUMBERS
);
for
(
MessageQueue
mq
:
subscribeMessageQueues
)
{
assertThat
(
mqAdminExt
.
minOffset
(
mq
)).
isEqualTo
(
0L
);
}
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-sync-%d-%d"
,
methodName
,
migratedMessageQueue
.
getQueueId
(),
i
).
getBytes
(
StandardCharsets
.
UTF_8
)),
migratedMessageQueue
);
assertThat
(
sendResult
.
getMessageQueue
().
getBrokerName
()).
isEqualTo
(
migratedMessageQueue
.
getBrokerName
());
assertThat
(
sendResult
.
getMessageQueue
().
getQueueId
()).
isEqualTo
(
migratedMessageQueue
.
getQueueId
());
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
sendResult2
.
getOrigBrokerName
()).
isEqualTo
(
newBrokerName
);
assertThat
(
sendResult2
.
getOrigQueueId
()).
isEqualTo
(
QUEUE_NUMBERS
);
}
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
CompletableFuture
<
SendResult
>
future
=
new
CompletableFuture
<>();
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-async-%d-%d"
,
methodName
,
migratedMessageQueue
.
getQueueId
(),
i
).
getBytes
(
StandardCharsets
.
UTF_8
)),
migratedMessageQueue
,
new
SendCallback
()
{
@Override
public
void
onSuccess
(
SendResult
sendResult
)
{
future
.
complete
(
sendResult
);
}
@Override
public
void
onException
(
Throwable
e
)
{
future
.
completeExceptionally
(
e
);
}
});
SendResult
sendResult
=
future
.
get
();
assertThat
(
sendResult
.
getMessageQueue
().
getBrokerName
()).
isEqualTo
(
migratedMessageQueue
.
getBrokerName
());
assertThat
(
sendResult
.
getMessageQueue
().
getQueueId
()).
isEqualTo
(
migratedMessageQueue
.
getQueueId
());
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
sendResult2
.
getOrigBrokerName
()).
isEqualTo
(
newBrokerName
);
assertThat
(
sendResult2
.
getOrigQueueId
()).
isEqualTo
(
QUEUE_NUMBERS
);
}
assertThat
(
maxOffsetUncommitted
(
migratedMessageQueue
)).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
waitAtMost
(
5
,
TimeUnit
.
SECONDS
).
until
(()
->
mqAdminExt
.
maxOffset
(
migratedMessageQueue
)
==
2
*
MSG_SENT_TIMES
);
PullResult
pullResult
=
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
0L
,
2
*
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
FOUND
);
assertThat
(
pullResult
.
getMinOffset
()).
isEqualTo
(
0
);
assertThat
(
pullResult
.
getMaxOffset
()).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getNextBeginOffset
()).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
List
<
MessageExt
>
msgFoundList
=
pullResult
.
getMsgFoundList
();
assertThat
(
msgFoundList
).
hasSize
(
2
*
MSG_SENT_TIMES
);
Iterator
<
MessageExt
>
it
=
pullResult
.
getMsgFoundList
().
iterator
();
long
offset
=
0L
;
for
(
String
prefix
:
new
String
[]
{
"sync"
,
"async"
})
{
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
MessageExt
msg
=
it
.
next
();
assertThat
(
msg
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
msg
.
getQueueId
()).
isEqualTo
(
migratedMessageQueue
.
getQueueId
());
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-%s-%d-%d"
,
methodName
,
prefix
,
migratedMessageQueue
.
getQueueId
(),
i
));
assertThat
(
msg
.
getQueueOffset
()).
isEqualTo
(
offset
);
offset
++;
}
}
offset
=
pullResult
.
getNextBeginOffset
();
CompletableFuture
<
PullResult
>
future
=
new
CompletableFuture
<>();
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
10
,
new
PullCallback
()
{
@Override
public
void
onSuccess
(
PullResult
pullResult
)
{
future
.
complete
(
pullResult
);
}
@Override
public
void
onException
(
Throwable
e
)
{
future
.
completeExceptionally
(
e
);
}
});
pullResult
=
future
.
get
();
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
NO_NEW_MSG
);
assertThat
(
pullResult
.
getMinOffset
()).
isEqualTo
(
0
);
assertThat
(
pullResult
.
getMaxOffset
()).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getNextBeginOffset
()).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getMsgFoundList
()).
isNull
();
}
@Test
public
void
test004_MigrateOnceWithData
()
throws
Exception
{
final
String
methodName
=
getCurrentMethodName
();
final
int
logicalQueueIdx
=
1
;
TopicRouteData
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList1
=
topicRouteInfo
.
getLogicalQueuesInfo
().
get
(
logicalQueueIdx
);
LogicalQueueRouteData
lastLogicalQueueRouteData1
=
logicalQueueRouteDataList1
.
get
(
logicalQueueRouteDataList1
.
size
()
-
1
);
String
newBrokerName
;
if
(
lastLogicalQueueRouteData1
.
getBrokerName
().
equals
(
broker1Name
))
{
newBrokerName
=
broker2Name
;
}
else
{
newBrokerName
=
broker1Name
;
}
MessageQueue
migratedMessageQueue
=
new
MessageQueue
(
topic
,
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
,
logicalQueueIdx
);
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-sync-%d-%d"
,
methodName
,
migratedMessageQueue
.
getQueueId
(),
i
).
getBytes
(
StandardCharsets
.
UTF_8
)),
migratedMessageQueue
);
assertThat
(
sendResult
.
getMessageQueue
().
getBrokerName
()).
isEqualTo
(
migratedMessageQueue
.
getBrokerName
());
assertThat
(
sendResult
.
getMessageQueue
().
getQueueId
()).
isEqualTo
(
migratedMessageQueue
.
getQueueId
());
}
assertThat
(
maxOffsetUncommitted
(
migratedMessageQueue
)).
isEqualTo
(
MSG_SENT_TIMES
);
waitAtMost
(
5
,
TimeUnit
.
SECONDS
).
until
(()
->
mqAdminExt
.
maxOffset
(
migratedMessageQueue
)
==
MSG_SENT_TIMES
);
{
long
offset
=
0L
;
PullResult
pullResult
=
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
2
*
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
FOUND
);
assertThat
(
pullResult
.
getMinOffset
()).
isEqualTo
(
0
);
assertThat
(
pullResult
.
getMaxOffset
()).
isEqualTo
(
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getNextBeginOffset
()).
isEqualTo
(
MSG_SENT_TIMES
);
List
<
MessageExt
>
msgFoundList
=
pullResult
.
getMsgFoundList
();
assertThat
(
msgFoundList
).
hasSize
(
MSG_SENT_TIMES
);
Iterator
<
MessageExt
>
it
=
pullResult
.
getMsgFoundList
().
iterator
();
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
MessageExt
msg
=
it
.
next
();
assertThat
(
msg
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
msg
.
getQueueId
()).
isEqualTo
(
migratedMessageQueue
.
getQueueId
());
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-sync-%d-%d"
,
methodName
,
migratedMessageQueue
.
getQueueId
(),
i
));
assertThat
(
msg
.
getQueueOffset
()).
isEqualTo
(
offset
);
offset
++;
}
}
new
MigrateTopicLogicalQueueCommand
().
execute
(
mqAdminExt
,
topic
,
logicalQueueIdx
,
newBrokerName
,
null
);
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
assertThat
(
topicRouteInfo
.
getLogicalQueuesInfo
()).
isNotNull
();
for
(
Map
.
Entry
<
Integer
,
List
<
LogicalQueueRouteData
>>
entry
:
topicRouteInfo
.
getLogicalQueuesInfo
().
entrySet
())
{
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList2
=
entry
.
getValue
();
if
(
entry
.
getKey
()
==
logicalQueueIdx
)
{
assertThat
(
logicalQueueRouteDataList2
).
hasSize
(
logicalQueueRouteDataList1
.
size
()
+
1
);
LogicalQueueRouteData
lastLogicalQueueRouteData2
=
logicalQueueRouteDataList2
.
get
(
logicalQueueRouteDataList2
.
size
()
-
2
);
assertThat
(
lastLogicalQueueRouteData2
.
getMessageQueue
()).
isEqualTo
(
lastLogicalQueueRouteData1
.
getMessageQueue
());
assertThat
(
lastLogicalQueueRouteData2
.
getOffsetMax
()).
isGreaterThanOrEqualTo
(
0L
);
assertThat
(
lastLogicalQueueRouteData2
.
getMessagesCount
()).
isEqualTo
(
MSG_SENT_TIMES
);
assertThat
(
lastLogicalQueueRouteData2
.
isWritable
()).
isFalse
();
assertThat
(
lastLogicalQueueRouteData2
.
isReadable
()).
isTrue
();
assertThat
(
lastLogicalQueueRouteData2
.
isExpired
()).
isFalse
();
assertThat
(
lastLogicalQueueRouteData2
.
getLogicalQueueDelta
()).
isEqualTo
(
0L
);
LogicalQueueRouteData
lastLogicalQueueRouteData3
=
logicalQueueRouteDataList2
.
get
(
logicalQueueRouteDataList2
.
size
()
-
1
);
assertThat
(
lastLogicalQueueRouteData3
.
getBrokerName
()).
isEqualTo
(
newBrokerName
);
assertThat
(
lastLogicalQueueRouteData3
.
getOffsetMax
()).
isLessThan
(
0L
);
assertThat
(
lastLogicalQueueRouteData3
.
isWritable
()).
isTrue
();
assertThat
(
lastLogicalQueueRouteData3
.
isReadable
()).
isTrue
();
assertThat
(
lastLogicalQueueRouteData3
.
isExpired
()).
isFalse
();
assertThat
(
lastLogicalQueueRouteData3
.
getLogicalQueueDelta
()).
isEqualTo
(
MSG_SENT_TIMES
);
}
else
{
assertThat
(
logicalQueueRouteDataList2
).
hasSize
(
1
);
LogicalQueueRouteData
logicalQueueRouteData
=
logicalQueueRouteDataList2
.
get
(
0
);
assertThat
(
logicalQueueRouteData
.
getOffsetMax
()).
isLessThan
(
0L
);
assertThat
(
logicalQueueRouteData
.
isWritable
()).
isTrue
();
assertThat
(
logicalQueueRouteData
.
isReadable
()).
isTrue
();
assertThat
(
logicalQueueRouteData
.
isExpired
()).
isFalse
();
assertThat
(
logicalQueueRouteData
.
getLogicalQueueDelta
()).
isEqualTo
(
0L
);
}
}
assertThat
(
migratedMessageQueue
).
isNotNull
();
List
<
MessageQueue
>
subscribeMessageQueues
=
consumer
.
fetchSubscribeMessageQueues
(
topic
).
stream
().
sorted
().
collect
(
Collectors
.
toList
());
assertThat
(
subscribeMessageQueues
).
hasSize
(
brokerNum
*
QUEUE_NUMBERS
);
for
(
MessageQueue
mq
:
subscribeMessageQueues
)
{
assertThat
(
mqAdminExt
.
minOffset
(
mq
)).
isEqualTo
(
0L
);
}
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
CompletableFuture
<
SendResult
>
future
=
new
CompletableFuture
<>();
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-async-%d-%d"
,
methodName
,
migratedMessageQueue
.
getQueueId
(),
i
).
getBytes
(
StandardCharsets
.
UTF_8
)),
migratedMessageQueue
,
new
SendCallback
()
{
@Override
public
void
onSuccess
(
SendResult
sendResult
)
{
future
.
complete
(
sendResult
);
}
@Override
public
void
onException
(
Throwable
e
)
{
future
.
completeExceptionally
(
e
);
}
});
SendResult
sendResult
=
future
.
get
();
assertThat
(
sendResult
.
getMessageQueue
().
getBrokerName
()).
isEqualTo
(
migratedMessageQueue
.
getBrokerName
());
assertThat
(
sendResult
.
getMessageQueue
().
getQueueId
()).
isEqualTo
(
migratedMessageQueue
.
getQueueId
());
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
sendResult2
.
getOrigBrokerName
()).
isEqualTo
(
newBrokerName
);
assertThat
(
sendResult2
.
getOrigQueueId
()).
isEqualTo
(
QUEUE_NUMBERS
);
}
assertThat
(
maxOffsetUncommitted
(
migratedMessageQueue
)).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
waitAtMost
(
5
,
TimeUnit
.
SECONDS
).
until
(()
->
mqAdminExt
.
maxOffset
(
migratedMessageQueue
)
==
2
*
MSG_SENT_TIMES
);
long
offset
=
0L
;
PullResult
pullResult
=
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
2
*
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
FOUND
);
assertThat
(
pullResult
.
getMinOffset
()).
isEqualTo
(
0
);
assertThat
(
pullResult
.
getMaxOffset
()).
isEqualTo
(
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getNextBeginOffset
()).
isEqualTo
(
MSG_SENT_TIMES
);
List
<
MessageExt
>
msgFoundList
=
pullResult
.
getMsgFoundList
();
assertThat
(
msgFoundList
).
hasSize
(
MSG_SENT_TIMES
);
Iterator
<
MessageExt
>
it
=
pullResult
.
getMsgFoundList
().
iterator
();
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
MessageExt
msg
=
it
.
next
();
assertThat
(
msg
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
msg
.
getQueueId
()).
isEqualTo
(
migratedMessageQueue
.
getQueueId
());
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-sync-%d-%d"
,
methodName
,
migratedMessageQueue
.
getQueueId
(),
i
));
assertThat
(
msg
.
getQueueOffset
()).
isEqualTo
(
offset
);
offset
++;
}
offset
=
pullResult
.
getNextBeginOffset
();
CompletableFuture
<
PullResult
>
pullResultFuture
=
new
CompletableFuture
<>();
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
2
*
MSG_SENT_TIMES
,
new
PullCallback
()
{
@Override
public
void
onSuccess
(
PullResult
pullResult
)
{
pullResultFuture
.
complete
(
pullResult
);
}
@Override
public
void
onException
(
Throwable
e
)
{
pullResultFuture
.
completeExceptionally
(
e
);
}
});
pullResult
=
pullResultFuture
.
get
();
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
FOUND
);
assertThat
(
pullResult
.
getMinOffset
()).
isEqualTo
(
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getMaxOffset
()).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getNextBeginOffset
()).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
msgFoundList
=
pullResult
.
getMsgFoundList
();
assertThat
(
msgFoundList
).
hasSize
(
MSG_SENT_TIMES
);
it
=
pullResult
.
getMsgFoundList
().
iterator
();
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
MessageExt
msg
=
it
.
next
();
assertThat
(
msg
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
msg
.
getQueueId
()).
isEqualTo
(
migratedMessageQueue
.
getQueueId
());
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-async-%d-%d"
,
methodName
,
migratedMessageQueue
.
getQueueId
(),
i
));
assertThat
(
msg
.
getQueueOffset
()).
isEqualTo
(
offset
);
offset
++;
}
offset
=
pullResult
.
getNextBeginOffset
();
pullResult
=
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
10
);
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
NO_NEW_MSG
);
assertThat
(
pullResult
.
getMinOffset
()).
isEqualTo
(
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getMaxOffset
()).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getNextBeginOffset
()).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
assertThat
(
pullResult
.
getMsgFoundList
()).
isNull
();
}
@Test
public
void
test005_MigrateWithDataBackAndForth
()
throws
Exception
{
final
String
methodName
=
getCurrentMethodName
();
final
int
logicalQueueIdx
=
1
;
MessageQueue
migratedMessageQueue
=
new
MessageQueue
(
topic
,
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
,
logicalQueueIdx
);
BrokerController
brokerController
;
TopicRouteData
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
LogicalQueueRouteData
lastLogicalQueueRouteData
;
{
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList
=
topicRouteInfo
.
getLogicalQueuesInfo
().
get
(
logicalQueueIdx
);
lastLogicalQueueRouteData
=
logicalQueueRouteDataList
.
get
(
logicalQueueRouteDataList
.
size
()
-
1
);
}
final
String
fromBrokerName
,
toBrokerName
,
fromBrokerAddr
,
toBrokerAddr
;
if
(
lastLogicalQueueRouteData
.
getBrokerName
().
equals
(
broker1Name
))
{
fromBrokerName
=
broker1Name
;
fromBrokerAddr
=
brokerController1
.
getBrokerAddr
();
toBrokerName
=
broker2Name
;
toBrokerAddr
=
brokerController2
.
getBrokerAddr
();
}
else
{
fromBrokerName
=
broker2Name
;
fromBrokerAddr
=
brokerController2
.
getBrokerAddr
();
toBrokerName
=
broker1Name
;
toBrokerAddr
=
brokerController1
.
getBrokerAddr
();
}
int
msgIdx
=
0
;
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
logicalQueueIdx
,
msgIdx
++).
getBytes
(
StandardCharsets
.
UTF_8
)),
migratedMessageQueue
);
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
sendResult2
.
getOrigBrokerName
()).
isEqualTo
(
fromBrokerName
);
assertThat
(
sendResult2
.
getOrigQueueId
()).
isEqualTo
(
logicalQueueIdx
);
}
rotateBrokerCommitLog
(
brokerControllerMap
.
get
(
fromBrokerName
));
new
MigrateTopicLogicalQueueCommand
().
execute
(
mqAdminExt
,
topic
,
logicalQueueIdx
,
toBrokerName
,
null
);
{
LogicalQueuesInfo
info
;
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList
;
info
=
mqAdminExt
.
queryTopicLogicalQueueMapping
(
fromBrokerAddr
,
topic
);
logicalQueueRouteDataList
=
info
.
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
hasSize
(
2
);
info
=
mqAdminExt
.
queryTopicLogicalQueueMapping
(
toBrokerAddr
,
topic
);
logicalQueueRouteDataList
=
info
.
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
hasSize
(
1
);
}
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
logicalQueueIdx
,
msgIdx
++).
getBytes
(
StandardCharsets
.
UTF_8
)),
migratedMessageQueue
);
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
sendResult2
.
getOrigBrokerName
()).
isEqualTo
(
toBrokerName
);
assertThat
(
sendResult2
.
getOrigQueueId
()).
isEqualTo
(
QUEUE_NUMBERS
);
}
new
MigrateTopicLogicalQueueCommand
().
execute
(
mqAdminExt
,
topic
,
logicalQueueIdx
,
fromBrokerName
,
null
);
// now will reuse queue with a ReadOnly one
{
LogicalQueuesInfo
info
;
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList
;
info
=
mqAdminExt
.
queryTopicLogicalQueueMapping
(
fromBrokerAddr
,
topic
);
logicalQueueRouteDataList
=
info
.
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
hasSize
(
3
);
info
=
mqAdminExt
.
queryTopicLogicalQueueMapping
(
toBrokerAddr
,
topic
);
logicalQueueRouteDataList
=
info
.
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
hasSize
(
2
);
}
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
logicalQueueIdx
,
msgIdx
++).
getBytes
(
StandardCharsets
.
UTF_8
)),
migratedMessageQueue
);
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
sendResult2
.
getOrigBrokerName
()).
isEqualTo
(
fromBrokerName
);
assertThat
(
sendResult2
.
getOrigQueueId
()).
isEqualTo
(
logicalQueueIdx
);
}
LogicalQueueRouteData
logicalQueueRouteData1
;
LogicalQueueRouteData
logicalQueueRouteData2
;
{
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList
;
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
logicalQueueRouteDataList
=
topicRouteInfo
.
getLogicalQueuesInfo
().
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
hasSize
(
3
);
logicalQueueRouteData1
=
logicalQueueRouteDataList
.
get
(
0
);
assertThat
(
logicalQueueRouteData1
.
getLogicalQueueDelta
()).
isEqualTo
(
0
);
assertThat
(
logicalQueueRouteData1
.
isReadable
()).
isTrue
();
assertThat
(
logicalQueueRouteData1
.
isWritable
()).
isFalse
();
assertThat
(
logicalQueueRouteData1
.
isExpired
()).
isFalse
();
assertThat
(
logicalQueueRouteData1
.
isWriteOnly
()).
isFalse
();
assertThat
(
logicalQueueRouteData1
.
getBrokerName
()).
isEqualTo
(
fromBrokerName
);
assertThat
(
logicalQueueRouteData1
.
getOffsetMax
()).
isGreaterThanOrEqualTo
(
0L
);
assertThat
(
logicalQueueRouteData1
.
getMessagesCount
()).
isEqualTo
(
MSG_SENT_TIMES
);
assertThat
(
logicalQueueRouteData1
.
getFirstMsgTimeMillis
()).
isGreaterThan
(
0L
);
assertThat
(
logicalQueueRouteData1
.
getLastMsgTimeMillis
()).
isGreaterThan
(
0L
);
logicalQueueRouteData2
=
logicalQueueRouteDataList
.
get
(
1
);
assertThat
(
logicalQueueRouteData2
.
getLogicalQueueDelta
()).
isEqualTo
(
MSG_SENT_TIMES
);
assertThat
(
logicalQueueRouteData2
.
isReadable
()).
isTrue
();
assertThat
(
logicalQueueRouteData2
.
isWritable
()).
isFalse
();
assertThat
(
logicalQueueRouteData2
.
isExpired
()).
isFalse
();
assertThat
(
logicalQueueRouteData2
.
isWriteOnly
()).
isFalse
();
assertThat
(
logicalQueueRouteData2
.
getBrokerName
()).
isEqualTo
(
toBrokerName
);
assertThat
(
logicalQueueRouteData2
.
getOffsetMax
()).
isGreaterThanOrEqualTo
(
0L
);
assertThat
(
logicalQueueRouteData2
.
getMessagesCount
()).
isEqualTo
(
MSG_SENT_TIMES
);
assertThat
(
logicalQueueRouteData2
.
getFirstMsgTimeMillis
()).
isGreaterThan
(
0L
);
assertThat
(
logicalQueueRouteData2
.
getLastMsgTimeMillis
()).
isGreaterThan
(
0L
);
LogicalQueueRouteData
logicalQueueRouteData3
=
logicalQueueRouteDataList
.
get
(
2
);
assertThat
(
logicalQueueRouteData3
.
getLogicalQueueDelta
()).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
assertThat
(
logicalQueueRouteData3
.
isReadable
()).
isTrue
();
assertThat
(
logicalQueueRouteData3
.
isWritable
()).
isTrue
();
assertThat
(
logicalQueueRouteData3
.
isExpired
()).
isFalse
();
assertThat
(
logicalQueueRouteData3
.
isWriteOnly
()).
isFalse
();
assertThat
(
logicalQueueRouteData3
.
getBrokerName
()).
isEqualTo
(
fromBrokerName
);
assertThat
(
logicalQueueRouteData3
.
getOffsetMax
()).
isLessThan
(
0L
);
}
msgIdx
=
0
;
forLoop:
for
(
long
offset
=
0L
;
;
)
{
PullResult
pullResult
=
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
3
*
MSG_SENT_TIMES
);
switch
(
pullResult
.
getPullStatus
())
{
case
NO_NEW_MSG:
assertThat
(
offset
).
isGreaterThanOrEqualTo
(
3L
*
MSG_SENT_TIMES
);
break
forLoop
;
case
OFFSET_ILLEGAL:
offset
=
pullResult
.
getNextBeginOffset
();
break
;
default
:
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
FOUND
);
assertThat
(
pullResult
.
getMsgFoundList
()).
isNotNull
();
assertThat
(
pullResult
.
getMsgFoundList
()).
hasSize
(
MSG_SENT_TIMES
);
for
(
MessageExt
msg
:
pullResult
.
getMsgFoundList
())
{
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
logicalQueueIdx
,
msgIdx
));
msgIdx
++;
assertThat
(
msg
.
getQueueOffset
()).
isEqualTo
(
offset
);
offset
++;
}
offset
=
pullResult
.
getNextBeginOffset
();
break
;
}
}
waitAtMost
(
5
,
TimeUnit
.
SECONDS
).
until
(()
->
maxOffsetUncommitted
(
logicalQueueRouteData1
.
getMessageQueue
())
==
mqAdminExt
.
maxOffset
(
logicalQueueRouteData1
.
getMessageQueue
()));
waitAtMost
(
5
,
TimeUnit
.
SECONDS
).
until
(()
->
maxOffsetUncommitted
(
logicalQueueRouteData2
.
getMessageQueue
())
==
mqAdminExt
.
maxOffset
(
logicalQueueRouteData2
.
getMessageQueue
()));
// now verify after commit log cleaned, toBroker's first queue route data will be expired too
brokerController
=
brokerControllerMap
.
get
(
logicalQueueRouteData2
.
getBrokerName
());
rotateBrokerCommitLog
(
brokerController
);
deleteCommitLogFiles
(
brokerController
,
1
);
{
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList
=
topicRouteInfo
.
getLogicalQueuesInfo
().
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
hasSize
(
2
);
assertThat
(
logicalQueueRouteDataList
.
get
(
0
)).
isEqualToIgnoringGivenFields
(
new
LogicalQueueRouteData
(
logicalQueueIdx
,
0
,
new
MessageQueue
(
topic
,
fromBrokerName
,
logicalQueueIdx
),
MessageQueueRouteState
.
ReadOnly
,
0
,
3
,
-
1
,
-
1
,
fromBrokerAddr
),
"firstMsgTimeMillis"
,
"lastMsgTimeMillis"
);
assertThat
(
logicalQueueRouteDataList
.
get
(
1
)).
isEqualToComparingFieldByField
(
new
LogicalQueueRouteData
(
logicalQueueIdx
,
2
*
MSG_SENT_TIMES
,
new
MessageQueue
(
topic
,
fromBrokerName
,
logicalQueueIdx
),
MessageQueueRouteState
.
Normal
,
MSG_SENT_TIMES
,
-
1
,
-
1
,
-
1
,
fromBrokerAddr
));
}
// try pull again, since there is an expired queue route in the middle.
{
int
msgCount
=
0
;
Queue
<
Integer
>
wantMsgIdx
=
new
LinkedList
<>();
wantMsgIdx
.
addAll
(
IntStream
.
range
(
0
,
MSG_SENT_TIMES
).
boxed
().
collect
(
Collectors
.
toList
()));
wantMsgIdx
.
addAll
(
IntStream
.
range
(
2
*
MSG_SENT_TIMES
,
3
*
MSG_SENT_TIMES
).
boxed
().
collect
(
Collectors
.
toList
()));
forLoop:
for
(
long
offset
=
mqAdminExt
.
minOffset
(
migratedMessageQueue
);
;
)
{
PullResult
pullResult
=
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
3
*
MSG_SENT_TIMES
);
switch
(
pullResult
.
getPullStatus
())
{
case
NO_NEW_MSG:
assertThat
(
msgCount
).
as
(
"offset=%d"
,
offset
).
isEqualTo
(
2
*
MSG_SENT_TIMES
);
break
forLoop
;
case
OFFSET_ILLEGAL:
offset
=
pullResult
.
getNextBeginOffset
();
break
;
case
FOUND:
msgCount
+=
pullResult
.
getMsgFoundList
().
size
();
boolean
first
=
true
;
for
(
MessageExt
msg
:
pullResult
.
getMsgFoundList
())
{
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
as
(
"offset=%d"
,
offset
).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
logicalQueueIdx
,
wantMsgIdx
.
poll
()));
if
(
first
)
{
assertThat
(
msg
.
getQueueOffset
()).
isGreaterThanOrEqualTo
(
offset
);
first
=
false
;
}
else
{
assertThat
(
msg
.
getQueueOffset
()).
isGreaterThan
(
offset
);
}
offset
=
msg
.
getQueueOffset
();
}
offset
=
pullResult
.
getNextBeginOffset
();
break
;
default
:
Assert
.
fail
(
String
.
format
(
Locale
.
ENGLISH
,
"unexpected pull offset=%d status: %s"
,
offset
,
pullResult
));
}
}
}
// rotate first queue route to expired, and pull it
brokerController
=
brokerControllerMap
.
get
(
logicalQueueRouteData1
.
getBrokerName
());
rotateBrokerCommitLog
(
brokerController
);
deleteCommitLogFiles
(
brokerController
,
2
);
{
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList
;
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
logicalQueueRouteDataList
=
topicRouteInfo
.
getLogicalQueuesInfo
().
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
isEqualTo
(
Collections
.
singletonList
(
new
LogicalQueueRouteData
(
logicalQueueIdx
,
2
*
MSG_SENT_TIMES
,
new
MessageQueue
(
topic
,
fromBrokerName
,
logicalQueueIdx
),
MessageQueueRouteState
.
Normal
,
MSG_SENT_TIMES
,
-
1
,
-
1
,
-
1
,
fromBrokerAddr
)));
}
{
int
msgCount
=
0
;
Queue
<
Integer
>
wantMsgIdx
=
new
LinkedList
<>();
wantMsgIdx
.
addAll
(
IntStream
.
range
(
2
*
MSG_SENT_TIMES
,
3
*
MSG_SENT_TIMES
).
boxed
().
collect
(
Collectors
.
toList
()));
forLoop:
for
(
long
offset
=
mqAdminExt
.
minOffset
(
migratedMessageQueue
);
;
)
{
PullResult
pullResult
=
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
3
*
MSG_SENT_TIMES
);
switch
(
pullResult
.
getPullStatus
())
{
case
NO_NEW_MSG:
if
(
msgCount
!=
MSG_SENT_TIMES
)
{
Assert
.
fail
(
String
.
format
(
Locale
.
ENGLISH
,
"want %d msg but got %d"
,
MSG_SENT_TIMES
,
msgCount
));
}
break
forLoop
;
case
OFFSET_ILLEGAL:
offset
=
pullResult
.
getNextBeginOffset
();
break
;
case
FOUND:
msgCount
+=
pullResult
.
getMsgFoundList
().
size
();
boolean
first
=
true
;
for
(
MessageExt
msg
:
pullResult
.
getMsgFoundList
())
{
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
as
(
"offset=%d"
,
offset
).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
logicalQueueIdx
,
wantMsgIdx
.
poll
()));
if
(
first
)
{
assertThat
(
msg
.
getQueueOffset
()).
isGreaterThanOrEqualTo
(
offset
);
first
=
false
;
}
else
{
assertThat
(
msg
.
getQueueOffset
()).
isGreaterThan
(
offset
);
}
offset
=
msg
.
getQueueOffset
();
}
offset
=
pullResult
.
getNextBeginOffset
();
break
;
default
:
Assert
.
fail
(
String
.
format
(
Locale
.
ENGLISH
,
"unexpected pull offset=%d status: %s"
,
offset
,
pullResult
));
}
}
}
brokerController
=
brokerControllerMap
.
get
(
fromBrokerName
);
rotateBrokerCommitLog
(
brokerController
);
deleteCommitLogFiles
(
brokerController
,
1
);
{
forLoop:
for
(
long
offset
=
mqAdminExt
.
minOffset
(
migratedMessageQueue
);
;
)
{
PullResult
pullResult
=
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
3
*
MSG_SENT_TIMES
);
// commit log rotate and cleaned, so there is no message.
switch
(
pullResult
.
getPullStatus
())
{
case
NO_MATCHED_MSG:
case
NO_NEW_MSG:
assertThat
(
pullResult
.
getNextBeginOffset
()).
isEqualTo
(
3
*
MSG_SENT_TIMES
);
break
forLoop
;
case
OFFSET_ILLEGAL:
offset
=
pullResult
.
getNextBeginOffset
();
break
;
default
:
Assert
.
fail
(
String
.
format
(
Locale
.
ENGLISH
,
"unexpected pull offset=%d status: %s"
,
offset
,
pullResult
));
}
}
}
{
LogicalQueuesInfo
logicalQueuesInfo
=
mqAdminExt
.
queryTopicLogicalQueueMapping
(
brokerController
.
getBrokerAddr
(),
topic
);
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList
=
logicalQueuesInfo
.
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
isEqualTo
(
Collections
.
singletonList
(
new
LogicalQueueRouteData
(
logicalQueueIdx
,
2
*
MSG_SENT_TIMES
,
new
MessageQueue
(
topic
,
fromBrokerName
,
logicalQueueIdx
),
MessageQueueRouteState
.
Normal
,
MSG_SENT_TIMES
,
-
1
,
-
1
,
-
1
,
fromBrokerAddr
)));
}
// try migrate to this broker which has a expired queue, expect it will reuse the expired one, pull it to verify if delta works well
new
MigrateTopicLogicalQueueCommand
().
execute
(
mqAdminExt
,
topic
,
logicalQueueIdx
,
toBrokerName
,
null
);
{
List
<
LogicalQueueRouteData
>
logicalQueueRouteDataList
;
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
logicalQueueRouteDataList
=
topicRouteInfo
.
getLogicalQueuesInfo
().
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
isEqualTo
(
Arrays
.
asList
(
new
LogicalQueueRouteData
(
logicalQueueIdx
,
2
*
MSG_SENT_TIMES
,
new
MessageQueue
(
topic
,
fromBrokerName
,
logicalQueueIdx
),
MessageQueueRouteState
.
Expired
,
MSG_SENT_TIMES
,
2
*
MSG_SENT_TIMES
,
0
,
0
,
fromBrokerAddr
)
,
new
LogicalQueueRouteData
(
logicalQueueIdx
,
3
*
MSG_SENT_TIMES
,
new
MessageQueue
(
topic
,
toBrokerName
,
QUEUE_NUMBERS
),
MessageQueueRouteState
.
Normal
,
MSG_SENT_TIMES
,
-
1
,
-
1
,
-
1
,
toBrokerAddr
)
));
LogicalQueuesInfo
info
;
info
=
mqAdminExt
.
queryTopicLogicalQueueMapping
(
fromBrokerAddr
,
topic
);
logicalQueueRouteDataList
=
info
.
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
isEqualTo
(
Arrays
.
asList
(
new
LogicalQueueRouteData
(
logicalQueueIdx
,
2
*
MSG_SENT_TIMES
,
new
MessageQueue
(
topic
,
fromBrokerName
,
logicalQueueIdx
),
MessageQueueRouteState
.
Expired
,
MSG_SENT_TIMES
,
2
*
MSG_SENT_TIMES
,
0
,
0
,
fromBrokerAddr
)
,
new
LogicalQueueRouteData
(
logicalQueueIdx
,
3
*
MSG_SENT_TIMES
,
new
MessageQueue
(
topic
,
toBrokerName
,
QUEUE_NUMBERS
),
MessageQueueRouteState
.
Normal
,
MSG_SENT_TIMES
,
-
1
,
-
1
,
-
1
,
toBrokerAddr
)
));
info
=
mqAdminExt
.
queryTopicLogicalQueueMapping
(
toBrokerAddr
,
topic
);
logicalQueueRouteDataList
=
info
.
get
(
logicalQueueIdx
);
assertThat
(
logicalQueueRouteDataList
).
isEqualTo
(
Collections
.
singletonList
(
new
LogicalQueueRouteData
(
logicalQueueIdx
,
3
*
MSG_SENT_TIMES
,
new
MessageQueue
(
topic
,
toBrokerName
,
QUEUE_NUMBERS
),
MessageQueueRouteState
.
Normal
,
MSG_SENT_TIMES
,
-
1
,
-
1
,
-
1
,
toBrokerAddr
)));
}
msgIdx
=
3
*
MSG_SENT_TIMES
;
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
logicalQueueIdx
,
msgIdx
++).
getBytes
(
StandardCharsets
.
UTF_8
)),
migratedMessageQueue
);
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
sendResult2
.
getOrigBrokerName
()).
isEqualTo
(
toBrokerName
);
assertThat
(
sendResult2
.
getOrigQueueId
()).
isEqualTo
(
QUEUE_NUMBERS
);
}
{
int
msgCount
=
0
;
Queue
<
Integer
>
wantMsgIdx
=
new
LinkedList
<>();
wantMsgIdx
.
addAll
(
IntStream
.
range
(
3
*
MSG_SENT_TIMES
,
4
*
MSG_SENT_TIMES
).
boxed
().
collect
(
Collectors
.
toList
()));
LOOP:
for
(
long
offset
=
0L
;
;
)
{
PullResult
pullResult
=
consumer
.
pull
(
migratedMessageQueue
,
"*"
,
offset
,
3
*
MSG_SENT_TIMES
);
switch
(
pullResult
.
getPullStatus
())
{
case
NO_NEW_MSG:
assertThat
(
msgCount
).
as
(
"msgCount with offset=%d"
,
offset
).
isEqualTo
(
MSG_SENT_TIMES
);
break
LOOP
;
case
OFFSET_ILLEGAL:
assertThat
(
pullResult
.
getNextBeginOffset
()).
isNotEqualTo
(
Long
.
MIN_VALUE
);
offset
=
pullResult
.
getNextBeginOffset
();
break
;
case
FOUND:
msgCount
+=
pullResult
.
getMsgFoundList
().
size
();
boolean
first
=
true
;
for
(
MessageExt
msg
:
pullResult
.
getMsgFoundList
())
{
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
as
(
"offset=%d"
,
offset
).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
logicalQueueIdx
,
wantMsgIdx
.
poll
()));
if
(
first
)
{
assertThat
(
msg
.
getQueueOffset
()).
isGreaterThanOrEqualTo
(
offset
);
first
=
false
;
}
else
{
assertThat
(
msg
.
getQueueOffset
()).
isGreaterThan
(
offset
);
}
offset
=
msg
.
getQueueOffset
();
}
offset
=
pullResult
.
getNextBeginOffset
();
break
;
default
:
Assert
.
fail
(
String
.
format
(
Locale
.
ENGLISH
,
"unexpected pull offset=%d status: %s"
,
offset
,
pullResult
));
}
}
}
}
@Test
public
void
test006_LogicalQueueNumChanged
()
throws
Exception
{
String
methodName
=
getCurrentMethodName
();
int
logicalQueueNum
=
brokerNum
*
QUEUE_NUMBERS
;
List
<
MessageQueue
>
publishMessageQueues
;
publishMessageQueues
=
producer
.
fetchPublishMessageQueues
(
topic
);
assertThat
(
publishMessageQueues
).
hasSize
(
logicalQueueNum
);
List
<
MessageQueue
>
subscribeMessageQueues
;
subscribeMessageQueues
=
consumer
.
fetchSubscribeMessageQueues
(
topic
).
stream
().
sorted
().
collect
(
Collectors
.
toList
());
assertThat
(
subscribeMessageQueues
).
hasSize
(
logicalQueueNum
);
logicalQueueNum
++;
new
UpdateTopicLogicalQueueNumCommand
().
execute
(
mqAdminExt
,
clusterName
,
topic
,
logicalQueueNum
);
int
newAddLogicalQueueIdx
=
logicalQueueNum
-
1
;
MessageQueue
newAddLogicalQueue
=
new
MessageQueue
(
topic
,
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
,
newAddLogicalQueueIdx
);
String
newAddLogicalQueueBrokerName
;
{
TopicRouteData
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
LogicalQueuesInfo
info
=
topicRouteInfo
.
getLogicalQueuesInfo
();
assertThat
(
info
).
isNotNull
();
List
<
LogicalQueueRouteData
>
queueRouteDataList
=
info
.
get
(
newAddLogicalQueueIdx
);
assertThat
(
queueRouteDataList
).
isNotNull
();
assertThat
(
queueRouteDataList
).
hasSize
(
1
);
LogicalQueueRouteData
queueRouteData
=
queueRouteDataList
.
get
(
0
);
newAddLogicalQueueBrokerName
=
queueRouteData
.
getBrokerName
();
assertThat
(
queueRouteData
.
getState
()).
isEqualTo
(
MessageQueueRouteState
.
Normal
);
assertThat
(
queueRouteData
.
getLogicalQueueDelta
()).
isEqualTo
(
0
);
assertThat
(
queueRouteData
.
getLogicalQueueIndex
()).
isEqualTo
(
newAddLogicalQueueIdx
);
}
publishMessageQueues
=
producer
.
fetchPublishMessageQueues
(
topic
);
assertThat
(
publishMessageQueues
).
hasSize
(
logicalQueueNum
);
Set
<
Integer
>
logicalQueueIds
=
IntStream
.
range
(
0
,
logicalQueueNum
).
boxed
().
collect
(
Collectors
.
toSet
());
Map
<
String
,
Set
<
Integer
>>
queueIds
=
Maps
.
newHashMap
();
for
(
String
brokerName
:
Arrays
.
asList
(
broker1Name
,
broker2Name
))
{
queueIds
.
put
(
brokerName
,
IntStream
.
range
(
0
,
QUEUE_NUMBERS
).
boxed
().
collect
(
Collectors
.
toSet
()));
}
queueIds
.
get
(
newAddLogicalQueueBrokerName
).
add
(
QUEUE_NUMBERS
);
for
(
MessageQueue
messageQueue
:
publishMessageQueues
)
{
assertThat
(
messageQueue
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
logicalQueueIds
.
remove
(
messageQueue
.
getQueueId
())).
isTrue
();
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
messageQueue
.
getQueueId
(),
i
).
getBytes
(
StandardCharsets
.
UTF_8
)),
messageQueue
);
assertThat
(
sendResult
.
getMessageQueue
().
getBrokerName
()).
isEqualTo
(
messageQueue
.
getBrokerName
());
assertThat
(
sendResult
.
getMessageQueue
().
getQueueId
()).
isEqualTo
(
messageQueue
.
getQueueId
());
if
(
i
==
0
)
{
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
queueIds
.
get
(
sendResult2
.
getOrigBrokerName
()).
remove
(
sendResult2
.
getOrigQueueId
())).
as
(
"brokerName %s queueId %d"
,
sendResult2
.
getOrigBrokerName
(),
sendResult2
.
getOrigQueueId
()).
isTrue
();
}
}
}
assertThat
(
logicalQueueIds
).
isEmpty
();
subscribeMessageQueues
=
consumer
.
fetchSubscribeMessageQueues
(
topic
).
stream
().
sorted
().
collect
(
Collectors
.
toList
());
assertThat
(
subscribeMessageQueues
).
hasSize
(
logicalQueueNum
);
subscribeMessageQueues
.
sort
(
Comparator
.
comparingInt
(
MessageQueue:
:
getQueueId
));
logicalQueueIds
.
addAll
(
IntStream
.
range
(
0
,
logicalQueueNum
).
boxed
().
collect
(
Collectors
.
toSet
()));
for
(
MessageQueue
messageQueue
:
subscribeMessageQueues
)
{
assertThat
(
messageQueue
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
logicalQueueIds
.
remove
(
messageQueue
.
getQueueId
())).
isTrue
();
long
offset
=
mqAdminExt
.
minOffset
(
messageQueue
);
assertThat
(
offset
).
isEqualTo
(
0
);
PullResult
pullResult
=
consumer
.
pull
(
messageQueue
,
"*"
,
offset
,
10
);
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
FOUND
);
assertThat
(
pullResult
.
getMsgFoundList
()).
hasSize
(
MSG_SENT_TIMES
);
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
MessageExt
msg
=
pullResult
.
getMsgFoundList
().
get
(
i
);
assertThat
(
msg
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
msg
.
getQueueId
()).
isEqualTo
(
messageQueue
.
getQueueId
());
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
messageQueue
.
getQueueId
(),
i
));
assertThat
(
msg
.
getQueueOffset
()).
isEqualTo
(
offset
+
i
);
}
assertThat
(
maxOffsetUncommitted
(
messageQueue
)).
isEqualTo
(
offset
+
MSG_SENT_TIMES
);
}
assertThat
(
logicalQueueIds
).
isEmpty
();
// increase TopicConfig write queue first then increase logical queue, expect to reuse
String
broker2Addr
=
brokerController2
.
getBrokerAddr
();
TopicConfig
topicConfig
=
mqAdminExt
.
examineTopicConfig
(
broker2Addr
,
topic
);
topicConfig
.
setWriteQueueNums
(
topicConfig
.
getWriteQueueNums
()
+
1
);
topicConfig
.
setReadQueueNums
(
topicConfig
.
getReadQueueNums
()
+
1
);
mqAdminExt
.
createAndUpdateTopicConfig
(
broker2Addr
,
topicConfig
);
logicalQueueNum
++;
new
UpdateTopicLogicalQueueNumCommand
().
execute
(
mqAdminExt
,
clusterName
,
topic
,
logicalQueueNum
);
{
newAddLogicalQueueIdx
=
logicalQueueNum
-
1
;
TopicRouteData
topicRouteInfo
=
mqAdminExt
.
examineTopicRouteInfo
(
topic
);
LogicalQueuesInfo
info
=
topicRouteInfo
.
getLogicalQueuesInfo
();
assertThat
(
info
).
isNotNull
();
List
<
LogicalQueueRouteData
>
queueRouteDataList
=
info
.
get
(
newAddLogicalQueueIdx
);
assertThat
(
queueRouteDataList
).
isNotNull
();
assertThat
(
queueRouteDataList
).
hasSize
(
1
);
LogicalQueueRouteData
queueRouteData
=
queueRouteDataList
.
get
(
0
);
assertThat
(
queueRouteData
.
getState
()).
isEqualTo
(
MessageQueueRouteState
.
Normal
);
assertThat
(
queueRouteData
.
getLogicalQueueDelta
()).
isEqualTo
(
0
);
assertThat
(
queueRouteData
.
getLogicalQueueIndex
()).
isEqualTo
(
newAddLogicalQueueIdx
);
assertThat
(
queueRouteData
.
getBrokerName
()).
isEqualTo
(
broker2Name
);
assertThat
(
queueRouteData
.
getQueueId
()).
isEqualTo
(
topicConfig
.
getWriteQueueNums
()
-
1
);
}
logicalQueueNum
-=
2
;
new
UpdateTopicLogicalQueueNumCommand
().
execute
(
mqAdminExt
,
clusterName
,
topic
,
logicalQueueNum
);
try
{
producer
.
send
(
new
Message
(
topic
,
"aaa"
.
getBytes
(
StandardCharsets
.
UTF_8
)),
newAddLogicalQueue
);
Assert
.
fail
(
"write to decreased logical queue success, want it failed"
);
}
catch
(
MQBrokerException
e
)
{
assertThat
(
e
.
getResponseCode
()).
isEqualTo
(
ResponseCode
.
NO_PERMISSION
);
}
{
int
offset
=
0
;
PullResult
pullResult
=
consumer
.
pull
(
newAddLogicalQueue
,
"*"
,
offset
,
10
);
assertThat
(
pullResult
.
getPullStatus
()).
isEqualTo
(
PullStatus
.
FOUND
);
assertThat
(
pullResult
.
getMsgFoundList
()).
hasSize
(
MSG_SENT_TIMES
);
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
MessageExt
msg
=
pullResult
.
getMsgFoundList
().
get
(
i
);
assertThat
(
msg
.
getBrokerName
()).
isEqualTo
(
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
);
assertThat
(
msg
.
getQueueId
()).
isEqualTo
(
newAddLogicalQueue
.
getQueueId
());
assertThat
(
new
String
(
msg
.
getBody
(),
StandardCharsets
.
UTF_8
)).
isEqualTo
(
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
newAddLogicalQueue
.
getQueueId
(),
i
));
assertThat
(
msg
.
getQueueOffset
()).
isEqualTo
(
offset
+
i
);
}
}
// rotate to remove new add queue's data, and try pull again
{
BrokerController
brokerController
=
brokerControllerMap
.
get
(
newAddLogicalQueueBrokerName
);
rotateBrokerCommitLog
(
brokerController
);
deleteCommitLogFiles
(
brokerController
,
1
);
}
{
int
offset
=
0
;
PullResult
pullResult
=
consumer
.
pull
(
newAddLogicalQueue
,
"*"
,
offset
,
10
);
assertThat
(
pullResult
.
getPullStatus
()).
isIn
(
PullStatus
.
NO_NEW_MSG
,
PullStatus
.
NO_MATCHED_MSG
);
}
}
@Test
public
void
test007_LogicalQueueWritableEvenBrokerDown
()
throws
Exception
{
final
String
methodName
=
getCurrentMethodName
();
final
int
logicalQueueIdx
=
1
;
BrokerController
brokerController3
=
IntegrationTestBase
.
createAndStartBroker
(
nsAddr
);
String
broker3Name
=
brokerController3
.
getBrokerConfig
().
getBrokerName
();
brokerControllerMap
.
put
(
broker3Name
,
brokerController3
);
await
().
atMost
(
30
,
TimeUnit
.
SECONDS
).
until
(()
->
mqAdminExt
.
examineBrokerClusterInfo
().
getBrokerAddrTable
().
containsKey
(
broker3Name
));
mqAdminExt
.
createAndUpdateTopicConfig
(
brokerController3
.
getBrokerAddr
(),
new
TopicConfig
(
topic
,
0
,
0
,
PermName
.
PERM_READ
|
PermName
.
PERM_WRITE
));
new
MigrateTopicLogicalQueueCommand
().
execute
(
mqAdminExt
,
topic
,
logicalQueueIdx
,
brokerController3
.
getBrokerConfig
().
getBrokerName
(),
null
);
MessageQueue
migrateMessageQueue
=
new
MessageQueue
(
topic
,
MixAll
.
LOGICAL_QUEUE_MOCK_BROKER_NAME
,
logicalQueueIdx
);
{
for
(
int
i
=
0
;
i
<
MSG_SENT_TIMES
;
i
++)
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
String
.
format
(
Locale
.
ENGLISH
,
"%s-%d-%d"
,
methodName
,
migrateMessageQueue
.
getQueueId
(),
i
).
getBytes
(
StandardCharsets
.
UTF_8
)),
migrateMessageQueue
);
assertThat
(
sendResult
.
getMessageQueue
().
getBrokerName
()).
isEqualTo
(
migrateMessageQueue
.
getBrokerName
());
assertThat
(
sendResult
.
getMessageQueue
().
getQueueId
()).
isEqualTo
(
migrateMessageQueue
.
getQueueId
());
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
sendResult2
.
getOrigBrokerName
()).
isEqualTo
(
broker3Name
);
assertThat
(
sendResult2
.
getOrigQueueId
()).
isEqualTo
(
0
);
}
}
brokerController3
.
shutdown
();
brokerControllerMap
.
remove
(
broker3Name
);
assertThatThrownBy
(()
->
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
"aaa"
.
getBytes
(
StandardCharsets
.
UTF_8
)),
migrateMessageQueue
);
logger
.
error
(
"send should fail but got {}"
,
sendResult
);
}).
isInstanceOf
(
RemotingException
.
class
).
hasMessageMatching
(
"connect to [0-9.:]+ failed"
);
assertThatThrownBy
(()
->
{
new
MigrateTopicLogicalQueueCommand
().
execute
(
mqAdminExt
,
topic
,
logicalQueueIdx
,
broker1Name
,
null
);
}).
hasRootCauseInstanceOf
(
RemotingConnectException
.
class
).
hasMessageContaining
(
"migrateTopicLogicalQueuePrepare"
);
{
SendResult
sendResult
=
producer
.
send
(
new
Message
(
topic
,
"aaa"
.
getBytes
(
StandardCharsets
.
UTF_8
)),
migrateMessageQueue
);
assertThat
(
sendResult
.
getMessageQueue
().
getBrokerName
()).
isEqualTo
(
migrateMessageQueue
.
getBrokerName
());
assertThat
(
sendResult
.
getMessageQueue
().
getQueueId
()).
isEqualTo
(
migrateMessageQueue
.
getQueueId
());
assertThat
(
sendResult
.
getQueueOffset
()).
isEqualTo
(-
1
);
SendResultForLogicalQueue
sendResult2
=
(
SendResultForLogicalQueue
)
sendResult
;
assertThat
(
sendResult2
.
getOrigBrokerName
()).
isEqualTo
(
broker1Name
);
assertThat
(
sendResult2
.
getOrigQueueId
()).
isIn
(
/* CommitLog not rotated, will not reuse */
QUEUE_NUMBERS
,
/* CommitLog rotated in other test cases, will reuse */
logicalQueueIdx
);
}
}
private
static
String
getBrokerCommitLogFileName
(
BrokerController
brokerController
)
throws
IllegalAccessException
{
DefaultMessageStore
defaultMessageStore
=
(
DefaultMessageStore
)
brokerController
.
getMessageStore
();
MappedFileQueue
mfq
=
(
MappedFileQueue
)
FieldUtils
.
readDeclaredField
(
defaultMessageStore
.
getCommitLog
(),
"mappedFileQueue"
,
true
);
return
mfq
.
getLastMappedFile
().
getFileName
();
}
private
static
void
deleteCommitLogFiles
(
BrokerController
brokerController
,
int
keepNum
)
throws
IllegalAccessException
{
CommitLog
commitLog
=
((
DefaultMessageStore
)
brokerController
.
getMessageStore
()).
getCommitLog
();
commitLog
.
flush
();
MappedFileQueue
mfq
=
(
MappedFileQueue
)
FieldUtils
.
readDeclaredField
(
commitLog
,
"mappedFileQueue"
,
true
);
AtomicInteger
count
=
new
AtomicInteger
();
waitAtMost
(
5
,
TimeUnit
.
SECONDS
).
until
(()
->
{
count
.
getAndAdd
(
commitLog
.
deleteExpiredFile
(
0
,
0
,
5000
,
true
,
1
));
return
mfq
.
getMappedFiles
().
size
()
<=
keepNum
;
});
brokerController
.
getTopicConfigManager
().
getLogicalQueueCleanHook
().
execute
((
DefaultMessageStore
)
brokerController
.
getMessageStore
(),
count
.
get
());
logger
.
info
(
"deleteCommitLogFiles {} count {}"
,
brokerController
.
getBrokerConfig
().
getBrokerName
(),
count
.
get
());
}
private
static
void
rotateBrokerCommitLog
(
BrokerController
brokerController
)
throws
IllegalAccessException
{
CommitLog
commitLog
=
((
DefaultMessageStore
)
brokerController
.
getMessageStore
()).
getCommitLog
();
commitLog
.
flush
();
String
brokerName
=
brokerController
.
getBrokerConfig
().
getBrokerName
();
String
fileName1
=
getBrokerCommitLogFileName
(
brokerController
);
logger
.
info
(
"rotateBrokerCommitLog {} first {}"
,
brokerName
,
fileName1
);
int
msgSize
=
4
*
1024
;
byte
[]
data
=
RandomStringUtils
.
randomAscii
(
msgSize
).
getBytes
(
StandardCharsets
.
UTF_8
);
Message
msg
=
new
Message
(
placeholderTopic
,
data
);
MessageQueue
mq
=
new
MessageQueue
(
placeholderTopic
,
brokerName
,
0
);
waitAtMost
(
5
,
TimeUnit
.
SECONDS
).
until
(()
->
{
for
(
int
i
=
0
;
i
<
128
;
i
++)
{
producer
.
send
(
msg
,
mq
);
}
commitLog
.
flush
();
String
fileName2
=
getBrokerCommitLogFileName
(
brokerController
);
if
(!
fileName1
.
equals
(
fileName2
))
{
logger
.
info
(
"rotateBrokerCommitLog {} 4K msg last {}"
,
brokerName
,
fileName2
);
return
true
;
}
return
false
;
});
}
private
long
maxOffsetUncommitted
(
MessageQueue
mq
)
throws
IllegalAccessException
,
MQClientException
{
DefaultMQAdminExtImpl
defaultMQAdminExtImpl
=
(
DefaultMQAdminExtImpl
)
FieldUtils
.
readDeclaredField
(
mqAdminExt
,
"defaultMQAdminExtImpl"
,
true
);
return
defaultMQAdminExtImpl
.
maxOffset
(
mq
,
false
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录