Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
8847fb82
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
8847fb82
编写于
3月 09, 2021
作者:
H
hill007299
提交者:
xxxxx
4月 20, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[RIP-19] Pop Consuming (tools)
上级
9d8f4c2c
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
185 addition
and
19 deletion
+185
-19
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
...va/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+12
-3
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
...rg/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+15
-5
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
...main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+20
-10
tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
...ava/org/apache/rocketmq/tools/command/MQAdminStartup.java
+3
-1
tools/src/main/java/org/apache/rocketmq/tools/command/consumer/SetConsumeModeSubCommand.java
...etmq/tools/command/consumer/SetConsumeModeSubCommand.java
+135
-0
未找到文件。
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
浏览文件 @
8847fb82
...
...
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.admin.RollbackStats;
import
org.apache.rocketmq.common.admin.TopicStatsTable
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.message.MessageRequestMode
;
import
org.apache.rocketmq.common.protocol.body.BrokerStatsData
;
import
org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
...
...
@@ -546,14 +547,22 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
@Override
public
boolean
resumeCheckHalfMessage
(
String
msgId
)
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
{
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
{
return
this
.
defaultMQAdminExtImpl
.
resumeCheckHalfMessage
(
msgId
);
}
@Override
public
boolean
resumeCheckHalfMessage
(
String
topic
,
String
msgId
)
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
{
String
msgId
)
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
{
return
this
.
defaultMQAdminExtImpl
.
resumeCheckHalfMessage
(
topic
,
msgId
);
}
@Override
public
void
setMessageRequestMode
(
final
String
brokerAddr
,
final
String
topic
,
final
String
consumerGroup
,
final
MessageRequestMode
mode
,
final
int
popShareQueueNum
,
final
long
timeoutMillis
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
MQClientException
{
this
.
defaultMQAdminExtImpl
.
setMessageRequestMode
(
brokerAddr
,
topic
,
consumerGroup
,
mode
,
popShareQueueNum
,
timeoutMillis
);
}
}
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
浏览文件 @
8847fb82
...
...
@@ -37,11 +37,11 @@ import org.apache.rocketmq.client.exception.MQClientException;
import
org.apache.rocketmq.client.impl.MQClientManager
;
import
org.apache.rocketmq.client.impl.factory.MQClientInstance
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.ServiceState
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.AclConfig
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.admin.ConsumeStats
;
import
org.apache.rocketmq.common.admin.OffsetWrapper
;
...
...
@@ -49,16 +49,16 @@ import org.apache.rocketmq.common.admin.RollbackStats;
import
org.apache.rocketmq.common.admin.TopicOffset
;
import
org.apache.rocketmq.common.admin.TopicStatsTable
;
import
org.apache.rocketmq.common.help.FAQUrl
;
import
org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.common.message.MessageClientExt
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.message.MessageRequestMode
;
import
org.apache.rocketmq.common.namesrv.NamesrvUtil
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.body.BrokerStatsData
;
import
org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
import
org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult
;
import
org.apache.rocketmq.common.protocol.body.ConsumeStatsList
;
...
...
@@ -78,6 +78,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import
org.apache.rocketmq.common.protocol.route.QueueData
;
import
org.apache.rocketmq.common.protocol.route.TopicRouteData
;
import
org.apache.rocketmq.common.subscription.SubscriptionGroupConfig
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.common.RemotingUtil
;
...
...
@@ -1035,14 +1036,15 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
@Override
public
boolean
resumeCheckHalfMessage
(
String
msgId
)
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
{
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
{
MessageExt
msg
=
this
.
viewMessage
(
msgId
);
return
this
.
mqClientInstance
.
getMQClientAPIImpl
().
resumeCheckHalfMessage
(
RemotingUtil
.
socketAddress2String
(
msg
.
getStoreHost
()),
msgId
,
timeoutMillis
);
}
@Override
public
boolean
resumeCheckHalfMessage
(
final
String
topic
,
final
String
msgId
)
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
{
public
boolean
resumeCheckHalfMessage
(
final
String
topic
,
final
String
msgId
)
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
{
MessageExt
msg
=
this
.
viewMessage
(
topic
,
msgId
);
if
(
msg
.
getProperty
(
MessageConst
.
PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
)
==
null
)
{
return
this
.
mqClientInstance
.
getMQClientAPIImpl
().
resumeCheckHalfMessage
(
RemotingUtil
.
socketAddress2String
(
msg
.
getStoreHost
()),
msgId
,
timeoutMillis
);
...
...
@@ -1051,4 +1053,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return
this
.
mqClientInstance
.
getMQClientAPIImpl
().
resumeCheckHalfMessage
(
RemotingUtil
.
socketAddress2String
(
msg
.
getStoreHost
()),
msgClient
.
getOffsetMsgId
(),
timeoutMillis
);
}
}
@Override
public
void
setMessageRequestMode
(
final
String
brokerAddr
,
final
String
topic
,
final
String
consumerGroup
,
final
MessageRequestMode
mode
,
final
int
popShareQueueNum
,
final
long
timeoutMillis
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
MQClientException
{
this
.
mqClientInstance
.
getMQClientAPIImpl
().
setMessageRequestMode
(
brokerAddr
,
topic
,
consumerGroup
,
mode
,
popShareQueueNum
,
timeoutMillis
);
}
}
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
浏览文件 @
8847fb82
...
...
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.admin.RollbackStats;
import
org.apache.rocketmq.common.admin.TopicStatsTable
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.message.MessageRequestMode
;
import
org.apache.rocketmq.common.protocol.body.BrokerStatsData
;
import
org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
...
...
@@ -71,16 +72,19 @@ public interface MQAdminExt extends MQAdmin {
final
TopicConfig
config
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
void
createAndUpdatePlainAccessConfig
(
final
String
addr
,
final
PlainAccessConfig
plainAccessConfig
)
throws
RemotingException
,
MQBrokerException
,
void
createAndUpdatePlainAccessConfig
(
final
String
addr
,
final
PlainAccessConfig
plainAccessConfig
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
void
deletePlainAccessConfig
(
final
String
addr
,
final
String
accessKey
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
void
updateGlobalWhiteAddrConfig
(
final
String
addr
,
final
String
globalWhiteAddrs
)
throws
RemotingException
,
MQBrokerException
,
void
updateGlobalWhiteAddrConfig
(
final
String
addr
,
final
String
globalWhiteAddrs
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
ClusterAclVersionInfo
examineBrokerClusterAclVersionInfo
(
final
String
addr
)
throws
RemotingException
,
MQBrokerException
,
ClusterAclVersionInfo
examineBrokerClusterAclVersionInfo
(
final
String
addr
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
AclConfig
examineBrokerClusterAclConfig
(
final
String
addr
)
throws
RemotingException
,
MQBrokerException
,
...
...
@@ -266,11 +270,11 @@ public interface MQAdminExt extends MQAdmin {
/**
* query consume queue data
*
* @param brokerAddr broker ip address
* @param topic topic
* @param queueId id of queue
* @param index start offset
* @param count how many
* @param brokerAddr
broker ip address
* @param topic
topic
* @param queueId
id of queue
* @param index
start offset
* @param count
how many
* @param consumerGroup group
*/
QueryConsumeQueueResponseBody
queryConsumeQueue
(
final
String
brokerAddr
,
...
...
@@ -279,7 +283,13 @@ public interface MQAdminExt extends MQAdmin {
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
MQClientException
;
boolean
resumeCheckHalfMessage
(
String
msgId
)
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
;
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
;
boolean
resumeCheckHalfMessage
(
final
String
topic
,
final
String
msgId
)
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
;
boolean
resumeCheckHalfMessage
(
final
String
topic
,
final
String
msgId
)
throws
RemotingException
,
MQClientException
,
InterruptedException
,
MQBrokerException
;
void
setMessageRequestMode
(
final
String
brokerAddr
,
final
String
topic
,
final
String
consumerGroup
,
final
MessageRequestMode
mode
,
final
int
popWorkGroupSize
,
final
long
timeoutMillis
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
MQClientException
;
}
tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
浏览文件 @
8847fb82
...
...
@@ -31,8 +31,8 @@ import org.apache.rocketmq.remoting.RPCHook;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
import
org.apache.rocketmq.tools.command.acl.ClusterAclConfigVersionListSubCommand
;
import
org.apache.rocketmq.tools.command.acl.GetAccessConfigSubCommand
;
import
org.apache.rocketmq.tools.command.acl.DeleteAccessConfigSubCommand
;
import
org.apache.rocketmq.tools.command.acl.GetAccessConfigSubCommand
;
import
org.apache.rocketmq.tools.command.acl.UpdateAccessConfigSubCommand
;
import
org.apache.rocketmq.tools.command.acl.UpdateGlobalWhiteAddrSubCommand
;
import
org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad
;
...
...
@@ -49,6 +49,7 @@ import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand
import
org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand
;
import
org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand
;
import
org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
;
import
org.apache.rocketmq.tools.command.consumer.SetConsumeModeSubCommand
;
import
org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand
;
import
org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand
;
import
org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand
;
...
...
@@ -152,6 +153,7 @@ public class MQAdminStartup {
initCommand
(
new
UpdateTopicSubCommand
());
initCommand
(
new
DeleteTopicSubCommand
());
initCommand
(
new
UpdateSubGroupSubCommand
());
initCommand
(
new
SetConsumeModeSubCommand
());
initCommand
(
new
DeleteSubscriptionGroupCommand
());
initCommand
(
new
UpdateBrokerConfigSubCommand
());
initCommand
(
new
UpdateTopicPermSubCommand
());
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/consumer/SetConsumeModeSubCommand.java
0 → 100644
浏览文件 @
8847fb82
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.tools.command.consumer
;
import
java.util.Set
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.Option
;
import
org.apache.commons.cli.Options
;
import
org.apache.rocketmq.common.message.MessageRequestMode
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExt
;
import
org.apache.rocketmq.tools.command.CommandUtil
;
import
org.apache.rocketmq.tools.command.SubCommand
;
import
org.apache.rocketmq.tools.command.SubCommandException
;
public
class
SetConsumeModeSubCommand
implements
SubCommand
{
@Override
public
String
commandName
()
{
return
"setConsumeMode"
;
}
@Override
public
String
commandDesc
()
{
return
"set consume message mode. pull/pop etc."
;
}
@Override
public
Options
buildCommandlineOptions
(
Options
options
)
{
Option
opt
=
new
Option
(
"b"
,
"brokerAddr"
,
true
,
"create subscription group to which broker"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"c"
,
"clusterName"
,
true
,
"create subscription group to which cluster"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"t"
,
"topicName"
,
true
,
"topic name"
);
opt
.
setRequired
(
true
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"g"
,
"groupName"
,
true
,
"consumer group name"
);
opt
.
setRequired
(
true
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"m"
,
"mode"
,
true
,
"consume mode. PULL/POP"
);
opt
.
setRequired
(
true
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"n"
,
"popShareQueueNum"
,
true
,
"num fo queue which share in pop mode"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
return
options
;
}
@Override
public
void
execute
(
CommandLine
commandLine
,
Options
options
,
RPCHook
rpcHook
)
throws
SubCommandException
{
DefaultMQAdminExt
defaultMQAdminExt
=
new
DefaultMQAdminExt
(
rpcHook
);
defaultMQAdminExt
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
defaultMQAdminExt
.
setVipChannelEnabled
(
false
);
try
{
String
topicName
=
commandLine
.
getOptionValue
(
't'
).
trim
();
String
groupName
=
commandLine
.
getOptionValue
(
'g'
).
trim
();
MessageRequestMode
mode
=
MessageRequestMode
.
valueOf
(
commandLine
.
getOptionValue
(
'm'
).
trim
());
int
popShareQueueNum
=
0
;
if
(
commandLine
.
hasOption
(
'n'
))
{
popShareQueueNum
=
Integer
.
parseInt
(
commandLine
.
getOptionValue
(
'n'
)
.
trim
());
}
if
(
commandLine
.
hasOption
(
'b'
))
{
String
addr
=
commandLine
.
getOptionValue
(
'b'
).
trim
();
defaultMQAdminExt
.
start
();
defaultMQAdminExt
.
setMessageRequestMode
(
addr
,
topicName
,
groupName
,
mode
,
popShareQueueNum
,
5000
);
System
.
out
.
printf
(
"set consume mode to %s success.%n"
,
addr
);
System
.
out
.
printf
(
"topic[%s] group[%s] consume mode[%s] popShareQueueNum[%d]"
,
topicName
,
groupName
,
mode
.
toString
(),
popShareQueueNum
);
return
;
}
else
if
(
commandLine
.
hasOption
(
'c'
))
{
String
clusterName
=
commandLine
.
getOptionValue
(
'c'
).
trim
();
defaultMQAdminExt
.
start
();
Set
<
String
>
masterSet
=
CommandUtil
.
fetchMasterAddrByClusterName
(
defaultMQAdminExt
,
clusterName
);
for
(
String
addr
:
masterSet
)
{
try
{
defaultMQAdminExt
.
setMessageRequestMode
(
addr
,
topicName
,
groupName
,
mode
,
popShareQueueNum
,
5000
);
System
.
out
.
printf
(
"set consume mode to %s success.%n"
,
addr
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
Thread
.
sleep
(
1000
*
1
);
}
}
System
.
out
.
printf
(
"topic[%s] group[%s] consume mode[%s] popShareQueueNum[%d]"
,
topicName
,
groupName
,
mode
.
toString
(),
popShareQueueNum
);
return
;
}
ServerUtil
.
printCommandLineHelp
(
"mqadmin "
+
this
.
commandName
(),
options
);
}
catch
(
Exception
e
)
{
throw
new
SubCommandException
(
this
.
getClass
().
getSimpleName
()
+
" command failed"
,
e
);
}
finally
{
defaultMQAdminExt
.
shutdown
();
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录