Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
e2c697bf
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
e2c697bf
编写于
2月 25, 2019
作者:
D
duhenglucky
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Modify checkstyle remove chinese characters
上级
0235a706
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
21 addition
and
24 deletion
+21
-24
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
...rg/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+20
-23
snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
...he/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
+1
-1
未找到文件。
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
浏览文件 @
e2c697bf
...
...
@@ -16,6 +16,19 @@
*/
package
org.apache.rocketmq.client.trace
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.UUID
;
import
java.util.concurrent.ArrayBlockingQueue
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicLong
;
import
org.apache.rocketmq.client.common.ThreadLocalIndex
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner
;
...
...
@@ -34,21 +47,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
java.io.IOException
;
import
java.util.concurrent.ArrayBlockingQueue
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicLong
;
import
java.util.UUID
;
import
java.util.List
;
import
java.util.ArrayList
;
import
java.util.Map
;
import
java.util.HashMap
;
import
java.util.Set
;
import
java.util.HashSet
;
import
static
org
.
apache
.
rocketmq
.
client
.
trace
.
TraceConstants
.
TRACE_INSTANCE_NAME
;
public
class
AsyncTraceDispatcher
implements
TraceDispatcher
{
...
...
@@ -73,7 +71,6 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private
String
traceTopicName
;
private
AtomicBoolean
isStarted
=
new
AtomicBoolean
(
false
);
public
AsyncTraceDispatcher
(
String
traceTopicName
,
RPCHook
rpcHook
)
throws
MQClientException
{
// queueSize is greater than or equal to the n power of 2 of value
this
.
queueSize
=
2048
;
...
...
@@ -88,12 +85,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this
.
traceTopicName
=
MixAll
.
RMQ_SYS_TRACE_TOPIC
;
}
this
.
traceExecuter
=
new
ThreadPoolExecutor
(
//
10
,
//
20
,
//
1000
*
60
,
//
TimeUnit
.
MILLISECONDS
,
//
this
.
appenderQueue
,
//
new
ThreadFactoryImpl
(
"MQTraceSendThread_"
));
10
,
//
20
,
//
1000
*
60
,
//
TimeUnit
.
MILLISECONDS
,
//
this
.
appenderQueue
,
//
new
ThreadFactoryImpl
(
"MQTraceSendThread_"
));
traceProducer
=
getAndCreateTraceProducer
(
rpcHook
);
}
...
...
@@ -317,7 +314,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
* Send message trace data
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data
the message trace data in this batch
* @param data the message trace data in this batch
*/
private
void
sendTraceDataByMQ
(
Set
<
String
>
keySet
,
final
String
data
)
{
String
topic
=
traceTopicName
;
...
...
snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
浏览文件 @
e2c697bf
...
...
@@ -48,7 +48,7 @@ public class LocalEnodeServiceImpl implements EnodeService {
RemotingCommand
request
)
{
CompletableFuture
<
RemotingCommand
>
completableFuture
=
new
CompletableFuture
<>();
try
{
log
.
debug
(
"Send message request
:
{}"
,
request
);
log
.
debug
(
"Send message request
:
{}"
,
request
);
RemotingCommand
remotingCommand
=
this
.
brokerController
.
getSendProcessor
().
processRequest
(
remotingChannel
,
request
);
CodecHelper
.
encodeHeader
(
remotingCommand
);
completableFuture
.
complete
(
remotingCommand
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录