Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
a83cdc5f
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
a83cdc5f
编写于
3年前
作者:
G
Git_Yang
提交者:
GitHub
3年前
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[format] Fix formatting errors (#3380)
Signed-off-by:
N
zhangyang
<
Git_Yang@163.com
>
上级
4dbdbf0a
master
5.0.0-alpha
5.0.0-alpha-static-topic
develop
release-4.9.2
rocketmq-all-4.9.2
无相关合并请求
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
94 addition
and
93 deletion
+94
-93
acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
...rc/main/java/org/apache/rocketmq/acl/common/AclUtils.java
+3
-4
example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
...java/org/apache/rocketmq/example/simple/PullConsumer.java
+91
-89
未找到文件。
acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
浏览文件 @
a83cdc5f
...
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
...
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
import
java.io.FileWriter
;
import
java.io.FileWriter
;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.io.PrintWriter
;
import
java.io.PrintWriter
;
import
java.util.ArrayList
;
import
java.util.Map
;
import
java.util.Map
;
import
java.util.SortedMap
;
import
java.util.SortedMap
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
...
@@ -210,9 +209,9 @@ public class AclUtils {
...
@@ -210,9 +209,9 @@ public class AclUtils {
// expand netaddress
// expand netaddress
int
separatorCount
=
StringUtils
.
countMatches
(
netaddress
,
":"
);
int
separatorCount
=
StringUtils
.
countMatches
(
netaddress
,
":"
);
int
padCount
=
part
-
separatorCount
;
int
padCount
=
part
-
separatorCount
;
if
(
padCount
>
0
)
{
if
(
padCount
>
0
)
{
StringBuilder
padStr
=
new
StringBuilder
(
":"
);
StringBuilder
padStr
=
new
StringBuilder
(
":"
);
for
(
int
i
=
0
;
i
<
padCount
;
i
++)
{
for
(
int
i
=
0
;
i
<
padCount
;
i
++)
{
padStr
.
append
(
":"
);
padStr
.
append
(
":"
);
}
}
netaddress
=
StringUtils
.
replace
(
netaddress
,
"::"
,
padStr
.
toString
());
netaddress
=
StringUtils
.
replace
(
netaddress
,
"::"
,
padStr
.
toString
());
...
@@ -221,7 +220,7 @@ public class AclUtils {
...
@@ -221,7 +220,7 @@ public class AclUtils {
// pad netaddress
// pad netaddress
String
[]
strArray
=
StringUtils
.
splitPreserveAllTokens
(
netaddress
,
":"
);
String
[]
strArray
=
StringUtils
.
splitPreserveAllTokens
(
netaddress
,
":"
);
for
(
int
i
=
0
;
i
<
strArray
.
length
;
i
++)
{
for
(
int
i
=
0
;
i
<
strArray
.
length
;
i
++)
{
if
(
strArray
[
i
].
length
()
<
4
)
{
if
(
strArray
[
i
].
length
()
<
4
)
{
strArray
[
i
]
=
StringUtils
.
leftPad
(
strArray
[
i
],
4
,
'0'
);
strArray
[
i
]
=
StringUtils
.
leftPad
(
strArray
[
i
],
4
,
'0'
);
}
}
}
}
...
...
This diff is collapsed.
Click to expand it.
example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
浏览文件 @
a83cdc5f
...
@@ -36,7 +36,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
...
@@ -36,7 +36,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
public
class
PullConsumer
{
public
class
PullConsumer
{
public
static
void
main
(
String
[]
args
)
throws
MQClientException
{
public
static
void
main
(
String
[]
args
)
throws
MQClientException
{
DefaultMQPullConsumer
consumer
=
new
DefaultMQPullConsumer
(
"please_rename_unique_group_name_5"
);
DefaultMQPullConsumer
consumer
=
new
DefaultMQPullConsumer
(
"please_rename_unique_group_name_5"
);
consumer
.
setNamesrvAddr
(
"127.0.0.1:9876"
);
consumer
.
setNamesrvAddr
(
"127.0.0.1:9876"
);
Set
<
String
>
topics
=
new
HashSet
<>();
Set
<
String
>
topics
=
new
HashSet
<>();
...
@@ -46,99 +46,101 @@ public class PullConsumer {
...
@@ -46,99 +46,101 @@ public class PullConsumer {
consumer
.
start
();
consumer
.
start
();
ExecutorService
executors
=
Executors
.
newFixedThreadPool
(
topics
.
size
(),
new
ThreadFactory
()
{
ExecutorService
executors
=
Executors
.
newFixedThreadPool
(
topics
.
size
(),
new
ThreadFactory
()
{
@Override
@Override
public
Thread
newThread
(
Runnable
r
)
{
public
Thread
newThread
(
Runnable
r
)
{
return
new
Thread
(
r
,
"PullConsumerThread"
);
return
new
Thread
(
r
,
"PullConsumerThread"
);
}
}
});
});
for
(
String
topic
:
consumer
.
getRegisterTopics
()){
for
(
String
topic
:
consumer
.
getRegisterTopics
())
{
executors
.
execute
(
new
Runnable
()
{
executors
.
execute
(
new
Runnable
()
{
public
void
doSomething
(
List
<
MessageExt
>
msgs
){
public
void
doSomething
(
List
<
MessageExt
>
msgs
)
{
//do you business
//do you business
System
.
out
.
println
(
msgs
);
}
}
@Override
public
void
run
()
{
@Override
while
(
true
){
public
void
run
()
{
try
{
while
(
true
)
{
Set
<
MessageQueue
>
messageQueues
=
consumer
.
fetchMessageQueuesInBalance
(
topic
);
try
{
if
(
messageQueues
==
null
||
messageQueues
.
isEmpty
()){
Set
<
MessageQueue
>
messageQueues
=
consumer
.
fetchMessageQueuesInBalance
(
topic
);
Thread
.
sleep
(
1000
);
if
(
messageQueues
==
null
||
messageQueues
.
isEmpty
())
{
continue
;
Thread
.
sleep
(
1000
);
}
continue
;
PullResult
pullResult
=
null
;
}
for
(
MessageQueue
messageQueue
:
messageQueues
){
PullResult
pullResult
=
null
;
try
{
for
(
MessageQueue
messageQueue
:
messageQueues
)
{
long
offset
=
this
.
consumeFromOffset
(
messageQueue
);
try
{
pullResult
=
consumer
.
pull
(
messageQueue
,
"*"
,
offset
,
32
);
long
offset
=
this
.
consumeFromOffset
(
messageQueue
);
switch
(
pullResult
.
getPullStatus
())
{
pullResult
=
consumer
.
pull
(
messageQueue
,
"*"
,
offset
,
32
);
case
FOUND:
switch
(
pullResult
.
getPullStatus
())
{
List
<
MessageExt
>
msgs
=
pullResult
.
getMsgFoundList
();
case
FOUND:
List
<
MessageExt
>
msgs
=
pullResult
.
getMsgFoundList
();
if
(
msgs
!=
null
&&
!
msgs
.
isEmpty
()){
this
.
doSomething
(
msgs
);
if
(
msgs
!=
null
&&
!
msgs
.
isEmpty
())
{
//update offset to broker
this
.
doSomething
(
msgs
);
consumer
.
updateConsumeOffset
(
messageQueue
,
pullResult
.
getNextBeginOffset
());
//update offset to broker
//print pull tps
consumer
.
updateConsumeOffset
(
messageQueue
,
pullResult
.
getNextBeginOffset
());
this
.
incPullTPS
(
topic
,
pullResult
.
getMsgFoundList
().
size
());
//print pull tps
}
this
.
incPullTPS
(
topic
,
pullResult
.
getMsgFoundList
().
size
());
break
;
}
case
OFFSET_ILLEGAL:
break
;
consumer
.
updateConsumeOffset
(
messageQueue
,
pullResult
.
getNextBeginOffset
());
case
OFFSET_ILLEGAL:
break
;
consumer
.
updateConsumeOffset
(
messageQueue
,
pullResult
.
getNextBeginOffset
());
case
NO_NEW_MSG:
break
;
Thread
.
sleep
(
1
);
case
NO_NEW_MSG:
consumer
.
updateConsumeOffset
(
messageQueue
,
pullResult
.
getNextBeginOffset
());
Thread
.
sleep
(
1
);
break
;
consumer
.
updateConsumeOffset
(
messageQueue
,
pullResult
.
getNextBeginOffset
());
case
NO_MATCHED_MSG:
break
;
consumer
.
updateConsumeOffset
(
messageQueue
,
pullResult
.
getNextBeginOffset
());
case
NO_MATCHED_MSG:
break
;
consumer
.
updateConsumeOffset
(
messageQueue
,
pullResult
.
getNextBeginOffset
());
default
:
break
;
}
default
:
}
catch
(
RemotingException
e
)
{
}
e
.
printStackTrace
();
}
catch
(
RemotingException
e
)
{
}
catch
(
MQBrokerException
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
}
catch
(
MQBrokerException
e
)
{
}
catch
(
Exception
e
){
e
.
printStackTrace
();
e
.
printStackTrace
();
}
catch
(
Exception
e
)
{
}
e
.
printStackTrace
();
}
}
}
catch
(
MQClientException
e
)
{
}
//reblance error
}
catch
(
MQClientException
e
)
{
e
.
printStackTrace
();
//reblance error
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
}
catch
(
InterruptedException
e
)
{
}
catch
(
Exception
e
){
e
.
printStackTrace
();
e
.
printStackTrace
();
}
catch
(
Exception
e
)
{
}
e
.
printStackTrace
();
}
}
}
}
}
public
long
consumeFromOffset
(
MessageQueue
messageQueue
)
throws
MQClientException
{
//-1 when started
public
long
consumeFromOffset
(
MessageQueue
messageQueue
)
throws
MQClientException
{
long
offset
=
consumer
.
getOffsetStore
().
readOffset
(
messageQueue
,
ReadOffsetType
.
READ_FROM_MEMORY
);
//-1 when started
if
(
offset
<
0
){
long
offset
=
consumer
.
getOffsetStore
().
readOffset
(
messageQueue
,
ReadOffsetType
.
READ_FROM_MEMORY
);
//query from broker
if
(
offset
<
0
)
{
offset
=
consumer
.
getOffsetStore
().
readOffset
(
messageQueue
,
ReadOffsetType
.
READ_FROM_STORE
);
//query from broker
}
offset
=
consumer
.
getOffsetStore
().
readOffset
(
messageQueue
,
ReadOffsetType
.
READ_FROM_STORE
);
if
(
offset
<
0
){
}
//first time start from last offset
if
(
offset
<
0
)
{
offset
=
consumer
.
maxOffset
(
messageQueue
);
//first time start from last offset
offset
=
consumer
.
maxOffset
(
messageQueue
);
}
}
//make sure
//make sure
if
(
offset
<
0
){
if
(
offset
<
0
)
{
offset
=
0
;
offset
=
0
;
}
}
return
offset
;
return
offset
;
}
}
public
void
incPullTPS
(
String
topic
,
int
pullSize
)
{
consumer
.
getDefaultMQPullConsumerImpl
().
getRebalanceImpl
().
getmQClientFactory
()
public
void
incPullTPS
(
String
topic
,
int
pullSize
)
{
.
getConsumerStatsManager
().
incPullTPS
(
consumer
.
getConsumerGroup
(),
topic
,
pullSize
);
consumer
.
getDefaultMQPullConsumerImpl
().
getRebalanceImpl
().
getmQClientFactory
()
}
.
getConsumerStatsManager
().
incPullTPS
(
consumer
.
getConsumerGroup
(),
topic
,
pullSize
);
});
}
});
}
}
// executors.shutdown();
// executors.shutdown();
// consumer.shutdown();
// consumer.shutdown();
...
...
This diff is collapsed.
Click to expand it.
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录
反馈
建议
客服
返回
顶部