Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
e04db85e
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
270
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
e04db85e
编写于
11月 14, 2018
作者:
H
huzongtang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
I have finished developing the new feature for the message track #525 initially.
上级
37223cac
变更
55
显示空白变更内容
内联
并排
Showing
55 changed file
with
1827 addition
and
51 deletion
+1827
-51
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
...pache/rocketmq/broker/processor/AdminBrokerProcessor.java
+8
-0
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
.../org/apache/rocketmq/broker/topic/TopicConfigManager.java
+11
-0
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
...pache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+59
-5
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
...apache/rocketmq/client/impl/factory/MQClientInstance.java
+1
-1
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
...rg/apache/rocketmq/client/producer/DefaultMQProducer.java
+61
-6
client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
...pache/rocketmq/client/producer/TransactionMQProducer.java
+2
-2
client/src/main/java/org/apache/rocketmq/client/trace/core/Utils/TrackTraceUtils.java
...che/rocketmq/client/trace/core/Utils/TrackTraceUtils.java
+105
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java
...che/rocketmq/client/trace/core/common/TrackTraceBean.java
+144
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
...ocketmq/client/trace/core/common/TrackTraceConstants.java
+34
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java
.../rocketmq/client/trace/core/common/TrackTraceContext.java
+136
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java
...ketmq/client/trace/core/common/TrackTraceDataEncoder.java
+175
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java
...mq/client/trace/core/common/TrackTraceDispatcherType.java
+22
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java
...etmq/client/trace/core/common/TrackTraceTransferBean.java
+44
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java
...che/rocketmq/client/trace/core/common/TrackTraceType.java
+26
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java
.../rocketmq/client/trace/core/dispatch/AsyncDispatcher.java
+51
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
...client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
+358
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
...t/trace/core/dispatch/impl/TrackTraceProducerFactory.java
+71
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java
...q/client/trace/core/hook/ConsumeMessageTraceHookImpl.java
+113
-0
client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
...etmq/client/trace/core/hook/SendMessageTrackHookImpl.java
+99
-0
client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
...e/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+2
-2
client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
...q/client/impl/consumer/DefaultMQPushConsumerImplTest.java
+1
-1
client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
.../rocketmq/client/impl/consumer/RebalancePushImplTest.java
+1
-1
client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
...pache/rocketmq/client/producer/DefaultMQProducerTest.java
+2
-2
client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
...rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+234
-0
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
...rc/main/java/org/apache/rocketmq/common/BrokerConfig.java
+10
-1
common/src/main/java/org/apache/rocketmq/common/MixAll.java
common/src/main/java/org/apache/rocketmq/common/MixAll.java
+3
-1
distribution/conf/2m-noslave/broker-a.properties
distribution/conf/2m-noslave/broker-a.properties
+1
-0
distribution/conf/2m-noslave/broker-b.properties
distribution/conf/2m-noslave/broker-b.properties
+1
-0
distribution/conf/2m-noslave/broker-trace.properties
distribution/conf/2m-noslave/broker-trace.properties
+24
-0
example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
...rg/apache/rocketmq/example/batch/SimpleBatchProducer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
...org/apache/rocketmq/example/batch/SplitBatchProducer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
.../java/org/apache/rocketmq/example/benchmark/Consumer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
.../java/org/apache/rocketmq/example/benchmark/Producer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
...a/org/apache/rocketmq/example/broadcast/PushConsumer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
...ain/java/org/apache/rocketmq/example/filter/Consumer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
...ain/java/org/apache/rocketmq/example/filter/Producer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
.../java/org/apache/rocketmq/example/filter/SqlConsumer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
.../java/org/apache/rocketmq/example/filter/SqlProducer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
.../java/org/apache/rocketmq/example/operation/Consumer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/operation/Producer.java
.../java/org/apache/rocketmq/example/operation/Producer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
...va/org/apache/rocketmq/example/ordermessage/Consumer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java
...va/org/apache/rocketmq/example/ordermessage/Producer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
...java/org/apache/rocketmq/example/quickstart/Consumer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
...java/org/apache/rocketmq/example/quickstart/Producer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
...ava/org/apache/rocketmq/example/simple/AsyncProducer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
...ain/java/org/apache/rocketmq/example/simple/Producer.java
+1
-2
example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
...java/org/apache/rocketmq/example/simple/PushConsumer.java
+3
-3
example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
...java/org/apache/rocketmq/example/simple/TestProducer.java
+1
-1
logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
.../apache/rocketmq/logappender/common/ProducerInstance.java
+1
-1
logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
...ava/org/apache/rocketmq/logappender/AbstractTestCase.java
+1
-1
test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
...rg/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
+1
-1
test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
...ava/org/apache/rocketmq/test/factory/ProducerFactory.java
+1
-1
tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java
...e/rocketmq/tools/command/broker/SendMsgStatusCommand.java
+1
-1
tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
...ocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+1
-1
tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
...ava/org/apache/rocketmq/tools/monitor/MonitorService.java
+1
-1
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
浏览文件 @
e04db85e
...
...
@@ -228,6 +228,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return
response
;
}
if
(
this
.
brokerController
.
getBrokerConfig
().
isAutoTraceBrokerEnable
())
{
String
errorMsg
=
"the topic["
+
requestHeader
.
getTopic
()
+
"] is user self defined topic and this node is trace broker!"
;
log
.
warn
(
errorMsg
);
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
errorMsg
);
return
response
;
}
try
{
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setOpaque
(
request
.
getOpaque
());
...
...
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
浏览文件 @
e04db85e
...
...
@@ -124,6 +124,17 @@ public class TopicConfigManager extends ConfigManager {
topicConfig
.
setWriteQueueNums
(
1
);
this
.
topicConfigTable
.
put
(
topicConfig
.
getTopicName
(),
topicConfig
);
}
{
// MixAll.RMQ_SYS_TRACK_TRACE_TOPIC
if
(
this
.
brokerController
.
getBrokerConfig
().
isAutoTraceBrokerEnable
())
{
String
topic
=
MixAll
.
RMQ_SYS_TRACK_TRACE_TOPIC
;
TopicConfig
topicConfig
=
new
TopicConfig
(
topic
);
this
.
systemTopicList
.
add
(
topic
);
topicConfig
.
setReadQueueNums
(
1
);
topicConfig
.
setWriteQueueNums
(
1
);
this
.
topicConfigTable
.
put
(
topicConfig
.
getTopicName
(),
topicConfig
);
}
}
}
public
boolean
isSystemTopic
(
final
String
topic
)
{
...
...
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
浏览文件 @
e04db85e
...
...
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Set
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.QueryResult
;
...
...
@@ -29,6 +30,12 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceConstants
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType
;
import
org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher
;
import
org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher
;
import
org.apache.rocketmq.client.trace.core.hook.ConsumeMessageTraceHookImpl
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.consumer.ConsumeFromWhere
;
...
...
@@ -36,6 +43,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
...
...
@@ -56,6 +64,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
*/
public
class
DefaultMQPushConsumer
extends
ClientConfig
implements
MQPushConsumer
{
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
/**
* Internal implementation. Most of the functions herein are delegated to it.
*/
...
...
@@ -246,11 +256,16 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private
long
consumeTimeout
=
15
;
/**
* Interface of asynchronous transfer data
*/
private
AsyncDispatcher
traceDispatcher
=
null
;
/**
* Default constructor.
*/
public
DefaultMQPushConsumer
()
{
this
(
MixAll
.
DEFAULT_CONSUMER_GROUP
,
null
,
new
AllocateMessageQueueAveragely
());
this
(
MixAll
.
DEFAULT_CONSUMER_GROUP
,
null
,
new
AllocateMessageQueueAveragely
()
,
false
);
}
/**
...
...
@@ -261,10 +276,29 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param allocateMessageQueueStrategy message queue allocating algorithm.
*/
public
DefaultMQPushConsumer
(
final
String
consumerGroup
,
RPCHook
rpcHook
,
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
)
{
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
,
boolean
msgTraceSwitch
)
{
this
.
consumerGroup
=
consumerGroup
;
this
.
allocateMessageQueueStrategy
=
allocateMessageQueueStrategy
;
defaultMQPushConsumerImpl
=
new
DefaultMQPushConsumerImpl
(
this
,
rpcHook
);
//if client open the message track trace feature
if
(
msgTraceSwitch
)
{
try
{
Properties
tempProperties
=
new
Properties
();
tempProperties
.
put
(
TrackTraceConstants
.
MAX_MSG_SIZE
,
"128000"
);
tempProperties
.
put
(
TrackTraceConstants
.
ASYNC_BUFFER_SIZE
,
"2048"
);
tempProperties
.
put
(
TrackTraceConstants
.
MAX_BATCH_NUM
,
"100"
);
tempProperties
.
put
(
TrackTraceConstants
.
INSTANCE_NAME
,
"PID_CLIENT_INNER_TRACE_PRODUCER"
);
tempProperties
.
put
(
TrackTraceConstants
.
TRACE_DISPATCHER_TYPE
,
TrackTraceDispatcherType
.
CONSUMER
.
name
());
AsyncArrayDispatcher
dispatcher
=
new
AsyncArrayDispatcher
(
tempProperties
);
dispatcher
.
setHostConsumer
(
this
.
getDefaultMQPushConsumerImpl
());
traceDispatcher
=
dispatcher
;
this
.
getDefaultMQPushConsumerImpl
().
registerConsumeMessageHook
(
new
ConsumeMessageTraceHookImpl
(
traceDispatcher
));
}
catch
(
Throwable
e
)
{
log
.
error
(
"system mqtrace hook init failed ,maybe can't send msg trace data"
);
}
}
}
/**
...
...
@@ -273,7 +307,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param rpcHook RPC hook to execute before each remoting command.
*/
public
DefaultMQPushConsumer
(
RPCHook
rpcHook
)
{
this
(
MixAll
.
DEFAULT_CONSUMER_GROUP
,
rpcHook
,
new
AllocateMessageQueueAveragely
());
this
(
MixAll
.
DEFAULT_CONSUMER_GROUP
,
rpcHook
,
new
AllocateMessageQueueAveragely
()
,
false
);
}
/**
...
...
@@ -281,8 +315,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param consumerGroup Consumer group.
*/
public
DefaultMQPushConsumer
(
final
String
consumerGroup
)
{
this
(
consumerGroup
,
null
,
new
AllocateMessageQueueAveragely
());
public
DefaultMQPushConsumer
(
final
String
consumerGroup
,
boolean
msgTraceSwitch
)
{
this
(
consumerGroup
,
null
,
new
AllocateMessageQueueAveragely
()
,
msgTraceSwitch
);
}
@Override
...
...
@@ -518,6 +552,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Override
public
void
start
()
throws
MQClientException
{
this
.
defaultMQPushConsumerImpl
.
start
();
if
(
null
!=
traceDispatcher
)
{
try
{
Properties
tempProperties
=
new
Properties
();
tempProperties
.
put
(
TrackTraceConstants
.
NAMESRV_ADDR
,
this
.
getNamesrvAddr
());
traceDispatcher
.
start
(
tempProperties
);
}
catch
(
MQClientException
e
)
{
log
.
warn
(
"trace dispatcher start failed "
,
e
);
}
}
}
/**
...
...
@@ -526,6 +569,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Override
public
void
shutdown
()
{
this
.
defaultMQPushConsumerImpl
.
shutdown
();
if
(
null
!=
traceDispatcher
)
{
traceDispatcher
.
shutdown
();
}
}
@Override
...
...
@@ -694,4 +740,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public
void
setConsumeTimeout
(
final
long
consumeTimeout
)
{
this
.
consumeTimeout
=
consumeTimeout
;
}
public
AsyncDispatcher
getTraceDispatcher
()
{
return
traceDispatcher
;
}
public
void
setTraceDispatcher
(
AsyncDispatcher
traceDispatcher
)
{
this
.
traceDispatcher
=
traceDispatcher
;
}
}
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
浏览文件 @
e04db85e
...
...
@@ -144,7 +144,7 @@ public class MQClientInstance {
this
.
rebalanceService
=
new
RebalanceService
(
this
);
this
.
defaultMQProducer
=
new
DefaultMQProducer
(
MixAll
.
CLIENT_INNER_PRODUCER_GROUP
);
this
.
defaultMQProducer
=
new
DefaultMQProducer
(
MixAll
.
CLIENT_INNER_PRODUCER_GROUP
,
false
);
this
.
defaultMQProducer
.
resetClientConfig
(
clientConfig
);
this
.
consumerStatsManager
=
new
ConsumerStatsManager
(
this
.
scheduledExecutorService
);
...
...
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
浏览文件 @
e04db85e
...
...
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.producer;
import
java.util.Collection
;
import
java.util.List
;
import
java.util.Properties
;
import
java.util.concurrent.ExecutorService
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.QueryResult
;
...
...
@@ -25,6 +26,12 @@ import org.apache.rocketmq.client.Validators;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceConstants
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType
;
import
org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher
;
import
org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher
;
import
org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageBatch
;
...
...
@@ -33,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageId
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
import
org.apache.rocketmq.remoting.netty.NettyRemotingClient
;
...
...
@@ -56,6 +64,8 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
*/
public
class
DefaultMQProducer
extends
ClientConfig
implements
MQProducer
{
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
...
...
@@ -119,11 +129,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private
int
maxMessageSize
=
1024
*
1024
*
4
;
// 4M
/**
* Interface of asynchronous transfer data
*/
private
AsyncDispatcher
traceDispatcher
=
null
;
/**
* Default constructor.
*/
public
DefaultMQProducer
()
{
this
(
MixAll
.
DEFAULT_PRODUCER_GROUP
,
null
);
this
(
MixAll
.
DEFAULT_PRODUCER_GROUP
,
null
,
false
);
}
/**
...
...
@@ -131,10 +146,30 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param msgTraceSwitch switch flag instance for message track trace
*/
public
DefaultMQProducer
(
final
String
producerGroup
,
RPCHook
rpcHook
)
{
public
DefaultMQProducer
(
final
String
producerGroup
,
RPCHook
rpcHook
,
boolean
msgTraceSwitch
)
{
this
.
producerGroup
=
producerGroup
;
defaultMQProducerImpl
=
new
DefaultMQProducerImpl
(
this
,
rpcHook
);
//if client open the message track trace feature
if
(
msgTraceSwitch
)
{
try
{
Properties
tempProperties
=
new
Properties
();
tempProperties
.
put
(
TrackTraceConstants
.
MAX_MSG_SIZE
,
"128000"
);
tempProperties
.
put
(
TrackTraceConstants
.
ASYNC_BUFFER_SIZE
,
"2048"
);
tempProperties
.
put
(
TrackTraceConstants
.
MAX_BATCH_NUM
,
"100"
);
tempProperties
.
put
(
TrackTraceConstants
.
INSTANCE_NAME
,
"PID_CLIENT_INNER_TRACE_PRODUCER"
);
tempProperties
.
put
(
TrackTraceConstants
.
TRACE_DISPATCHER_TYPE
,
TrackTraceDispatcherType
.
PRODUCER
.
name
());
AsyncArrayDispatcher
dispatcher
=
new
AsyncArrayDispatcher
(
tempProperties
);
dispatcher
.
setHostProducer
(
this
.
getDefaultMQProducerImpl
());
traceDispatcher
=
dispatcher
;
this
.
getDefaultMQProducerImpl
().
registerSendMessageHook
(
new
SendMessageTrackHookImpl
(
traceDispatcher
));
}
catch
(
Throwable
e
)
{
log
.
error
(
"system mqtrace hook init failed ,maybe can't send msg trace data"
);
}
}
}
/**
...
...
@@ -142,8 +177,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*
* @param producerGroup Producer group, see the name-sake field.
*/
public
DefaultMQProducer
(
final
String
producerGroup
)
{
this
(
producerGroup
,
null
);
public
DefaultMQProducer
(
final
String
producerGroup
,
boolean
msgTraceSwitch
)
{
this
(
producerGroup
,
null
,
msgTraceSwitch
);
}
/**
...
...
@@ -152,7 +187,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public
DefaultMQProducer
(
RPCHook
rpcHook
)
{
this
(
MixAll
.
DEFAULT_PRODUCER_GROUP
,
rpcHook
);
this
(
MixAll
.
DEFAULT_PRODUCER_GROUP
,
rpcHook
,
false
);
}
/**
...
...
@@ -170,6 +205,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public
void
start
()
throws
MQClientException
{
this
.
defaultMQProducerImpl
.
start
();
if
(
null
!=
traceDispatcher
)
{
try
{
Properties
tempProperties
=
new
Properties
();
tempProperties
.
put
(
TrackTraceConstants
.
NAMESRV_ADDR
,
this
.
getNamesrvAddr
());
traceDispatcher
.
start
(
tempProperties
);
}
catch
(
MQClientException
e
)
{
log
.
warn
(
"trace dispatcher start failed "
,
e
);
}
}
}
/**
...
...
@@ -178,6 +222,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public
void
shutdown
()
{
this
.
defaultMQProducerImpl
.
shutdown
();
if
(
null
!=
traceDispatcher
)
{
traceDispatcher
.
shutdown
();
}
}
/**
...
...
@@ -777,4 +824,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
public
void
setRetryTimesWhenSendAsyncFailed
(
final
int
retryTimesWhenSendAsyncFailed
)
{
this
.
retryTimesWhenSendAsyncFailed
=
retryTimesWhenSendAsyncFailed
;
}
public
AsyncDispatcher
getTraceDispatcher
()
{
return
traceDispatcher
;
}
public
void
setTraceDispatcher
(
AsyncDispatcher
traceDispatcher
)
{
this
.
traceDispatcher
=
traceDispatcher
;
}
}
client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
浏览文件 @
e04db85e
...
...
@@ -35,11 +35,11 @@ public class TransactionMQProducer extends DefaultMQProducer {
}
public
TransactionMQProducer
(
final
String
producerGroup
)
{
super
(
producerGroup
);
super
(
producerGroup
,
false
);
}
public
TransactionMQProducer
(
final
String
producerGroup
,
RPCHook
rpcHook
)
{
super
(
producerGroup
,
rpcHook
);
super
(
producerGroup
,
rpcHook
,
false
);
}
@Override
...
...
client/src/main/java/org/apache/rocketmq/client/trace/core/Utils/TrackTraceUtils.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.Utils
;
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
import
java.net.Inet6Address
;
import
java.net.InetAddress
;
import
java.net.NetworkInterface
;
import
java.net.SocketException
;
import
java.net.UnknownHostException
;
import
java.util.ArrayList
;
import
java.util.Enumeration
;
/**
* Track Trace Util
*/
public
class
TrackTraceUtils
{
public
static
String
getLocalAddress
()
{
try
{
//Traverse the network bind card to find a valid IP address and return it.
Enumeration
<
NetworkInterface
>
enumeration
=
NetworkInterface
.
getNetworkInterfaces
();
ArrayList
<
String
>
ipv4Result
=
new
ArrayList
<
String
>();
ArrayList
<
String
>
ipv6Result
=
new
ArrayList
<
String
>();
while
(
enumeration
.
hasMoreElements
())
{
final
NetworkInterface
networkInterface
=
enumeration
.
nextElement
();
final
Enumeration
<
InetAddress
>
en
=
networkInterface
.
getInetAddresses
();
while
(
en
.
hasMoreElements
())
{
final
InetAddress
address
=
en
.
nextElement
();
if
(!
address
.
isLoopbackAddress
())
{
if
(
address
instanceof
Inet6Address
)
{
ipv6Result
.
add
(
normalizeHostAddress
(
address
));
}
else
{
ipv4Result
.
add
(
normalizeHostAddress
(
address
));
}
}
}
}
// get priority to IPv4
if
(!
ipv4Result
.
isEmpty
())
{
for
(
String
ip
:
ipv4Result
)
{
if
(
ip
.
startsWith
(
"127.0"
)
||
ip
.
startsWith
(
"192.168"
))
{
continue
;
}
return
ip
;
}
//get the last one
return
ipv4Result
.
get
(
ipv4Result
.
size
()
-
1
);
}
//then use the ipv6 address
else
if
(!
ipv6Result
.
isEmpty
())
{
return
ipv6Result
.
get
(
0
);
}
//the use local ip address
final
InetAddress
localHost
=
InetAddress
.
getLocalHost
();
return
normalizeHostAddress
(
localHost
);
}
catch
(
SocketException
e
)
{
e
.
printStackTrace
();
}
catch
(
UnknownHostException
e
)
{
e
.
printStackTrace
();
}
finally
{
}
return
null
;
}
public
static
String
normalizeHostAddress
(
final
InetAddress
localHost
)
{
if
(
localHost
instanceof
Inet6Address
)
{
return
"["
+
localHost
.
getHostAddress
()
+
"]"
;
}
else
{
return
localHost
.
getHostAddress
();
}
}
public
static
String
toJson
(
final
Object
obj
,
boolean
prettyFormat
)
{
return
RemotingSerializable
.
toJson
(
obj
,
prettyFormat
);
}
public
static
<
T
>
T
fromJson
(
String
json
,
Class
<
T
>
classOfT
)
{
return
RemotingSerializable
.
fromJson
(
json
,
classOfT
);
}
public
static
String
replaceNull
(
String
ori
)
{
return
ori
==
null
?
""
:
ori
;
}
}
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.common
;
import
org.apache.rocketmq.client.trace.core.Utils.TrackTraceUtils
;
import
org.apache.rocketmq.common.message.MessageType
;
public
class
TrackTraceBean
{
private
static
final
String
LOCAL_ADDRESS
=
TrackTraceUtils
.
getLocalAddress
();
private
String
topic
=
""
;
private
String
msgId
=
""
;
private
String
offsetMsgId
=
""
;
private
String
tags
=
""
;
private
String
keys
=
""
;
private
String
storeHost
=
LOCAL_ADDRESS
;
private
String
clientHost
=
LOCAL_ADDRESS
;
private
long
storeTime
;
private
int
retryTimes
;
private
int
bodyLength
;
private
MessageType
msgType
;
public
MessageType
getMsgType
()
{
return
msgType
;
}
public
void
setMsgType
(
final
MessageType
msgType
)
{
this
.
msgType
=
msgType
;
}
public
String
getOffsetMsgId
()
{
return
offsetMsgId
;
}
public
void
setOffsetMsgId
(
final
String
offsetMsgId
)
{
this
.
offsetMsgId
=
offsetMsgId
;
}
public
String
getTopic
()
{
return
topic
;
}
public
void
setTopic
(
String
topic
)
{
this
.
topic
=
topic
;
}
public
String
getMsgId
()
{
return
msgId
;
}
public
void
setMsgId
(
String
msgId
)
{
this
.
msgId
=
msgId
;
}
public
String
getTags
()
{
return
tags
;
}
public
void
setTags
(
String
tags
)
{
this
.
tags
=
tags
;
}
public
String
getKeys
()
{
return
keys
;
}
public
void
setKeys
(
String
keys
)
{
this
.
keys
=
keys
;
}
public
String
getStoreHost
()
{
return
storeHost
;
}
public
void
setStoreHost
(
String
storeHost
)
{
this
.
storeHost
=
storeHost
;
}
public
String
getClientHost
()
{
return
clientHost
;
}
public
void
setClientHost
(
String
clientHost
)
{
this
.
clientHost
=
clientHost
;
}
public
long
getStoreTime
()
{
return
storeTime
;
}
public
void
setStoreTime
(
long
storeTime
)
{
this
.
storeTime
=
storeTime
;
}
public
int
getRetryTimes
()
{
return
retryTimes
;
}
public
void
setRetryTimes
(
int
retryTimes
)
{
this
.
retryTimes
=
retryTimes
;
}
public
int
getBodyLength
()
{
return
bodyLength
;
}
public
void
setBodyLength
(
int
bodyLength
)
{
this
.
bodyLength
=
bodyLength
;
}
}
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.common
;
import
org.apache.rocketmq.common.MixAll
;
public
class
TrackTraceConstants
{
public
static
final
String
NAMESRV_ADDR
=
"NAMESRV_ADDR"
;
public
static
final
String
ADDRSRV_URL
=
"ADDRSRV_URL"
;
public
static
final
String
INSTANCE_NAME
=
"InstanceName"
;
public
static
final
String
ASYNC_BUFFER_SIZE
=
"AsyncBufferSize"
;
public
static
final
String
MAX_BATCH_NUM
=
"MaxBatchNum"
;
public
static
final
String
WAKE_UP_NUM
=
"WakeUpNum"
;
public
static
final
String
MAX_MSG_SIZE
=
"MaxMsgSize"
;
public
static
final
String
GROUP_NAME
=
"_INNER_TRACE_PRODUCER"
;
public
static
final
String
TRACE_TOPIC
=
MixAll
.
RMQ_SYS_TRACK_TRACE_TOPIC
;
public
static
final
char
CONTENT_SPLITOR
=
(
char
)
1
;
public
static
final
char
FIELD_SPLITOR
=
(
char
)
2
;
public
static
final
String
TRACE_DISPATCHER_TYPE
=
"DispatcherType"
;
}
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.common
;
import
org.apache.rocketmq.common.message.MessageClientIDSetter
;
import
java.util.List
;
/**
* The context of Track Trace
*/
public
class
TrackTraceContext
implements
Comparable
<
TrackTraceContext
>
{
private
TrackTraceType
traceType
;
private
long
timeStamp
=
System
.
currentTimeMillis
();
private
String
regionId
=
""
;
private
String
regionName
=
""
;
private
String
groupName
=
""
;
private
int
costTime
=
0
;
private
boolean
isSuccess
=
true
;
private
String
requestId
=
MessageClientIDSetter
.
createUniqID
();
private
int
contextCode
=
0
;
private
List
<
TrackTraceBean
>
traceBeans
;
public
int
getContextCode
()
{
return
contextCode
;
}
public
void
setContextCode
(
final
int
contextCode
)
{
this
.
contextCode
=
contextCode
;
}
public
List
<
TrackTraceBean
>
getTraceBeans
()
{
return
traceBeans
;
}
public
void
setTraceBeans
(
List
<
TrackTraceBean
>
traceBeans
)
{
this
.
traceBeans
=
traceBeans
;
}
public
String
getRegionId
()
{
return
regionId
;
}
public
void
setRegionId
(
String
regionId
)
{
this
.
regionId
=
regionId
;
}
public
TrackTraceType
getTraceType
()
{
return
traceType
;
}
public
void
setTraceType
(
TrackTraceType
traceType
)
{
this
.
traceType
=
traceType
;
}
public
long
getTimeStamp
()
{
return
timeStamp
;
}
public
void
setTimeStamp
(
long
timeStamp
)
{
this
.
timeStamp
=
timeStamp
;
}
public
String
getGroupName
()
{
return
groupName
;
}
public
void
setGroupName
(
String
groupName
)
{
this
.
groupName
=
groupName
;
}
public
int
getCostTime
()
{
return
costTime
;
}
public
void
setCostTime
(
int
costTime
)
{
this
.
costTime
=
costTime
;
}
public
boolean
isSuccess
()
{
return
isSuccess
;
}
public
void
setSuccess
(
boolean
success
)
{
isSuccess
=
success
;
}
public
String
getRequestId
()
{
return
requestId
;
}
public
void
setRequestId
(
String
requestId
)
{
this
.
requestId
=
requestId
;
}
public
String
getRegionName
()
{
return
regionName
;
}
public
void
setRegionName
(
String
regionName
)
{
this
.
regionName
=
regionName
;
}
@Override
public
int
compareTo
(
TrackTraceContext
o
)
{
return
(
int
)
(
this
.
timeStamp
-
o
.
getTimeStamp
());
}
@Override
public
String
toString
()
{
StringBuilder
sb
=
new
StringBuilder
(
1024
);
sb
.
append
(
traceType
).
append
(
"_"
).
append
(
groupName
)
.
append
(
"_"
).
append
(
regionId
).
append
(
"_"
).
append
(
isSuccess
).
append
(
"_"
);
if
(
traceBeans
!=
null
&&
traceBeans
.
size
()
>
0
)
{
for
(
TrackTraceBean
bean
:
traceBeans
)
{
sb
.
append
(
bean
.
getMsgId
()
+
"_"
+
bean
.
getTopic
()
+
"_"
);
}
}
return
"TrackTraceContext{"
+
sb
.
toString
()
+
'}'
;
}
}
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.common
;
import
org.apache.rocketmq.common.message.MessageType
;
import
java.util.ArrayList
;
import
java.util.List
;
import
static
org
.
apache
.
rocketmq
.
client
.
trace
.
core
.
common
.
TrackTraceType
.
Pub
;
/**
* encode/decode for Track Trace Data
*/
public
class
TrackTraceDataEncoder
{
/**
* resolving traceContext list From track trace data String
*
* @param traceData
* @return
*/
public
static
List
<
TrackTraceContext
>
decoderFromTraceDataString
(
String
traceData
)
{
List
<
TrackTraceContext
>
resList
=
new
ArrayList
<
TrackTraceContext
>();
if
(
traceData
==
null
||
traceData
.
length
()
<=
0
)
{
return
resList
;
}
String
[]
contextList
=
traceData
.
split
(
String
.
valueOf
(
TrackTraceConstants
.
FIELD_SPLITOR
));
for
(
String
context
:
contextList
)
{
String
[]
line
=
context
.
split
(
String
.
valueOf
(
TrackTraceConstants
.
CONTENT_SPLITOR
));
if
(
line
[
0
].
equals
(
Pub
.
name
()))
{
TrackTraceContext
pubContext
=
new
TrackTraceContext
();
pubContext
.
setTraceType
(
Pub
);
pubContext
.
setTimeStamp
(
Long
.
parseLong
(
line
[
1
]));
pubContext
.
setRegionId
(
line
[
2
]);
pubContext
.
setGroupName
(
line
[
3
]);
TrackTraceBean
bean
=
new
TrackTraceBean
();
bean
.
setTopic
(
line
[
4
]);
bean
.
setMsgId
(
line
[
5
]);
bean
.
setTags
(
line
[
6
]);
bean
.
setKeys
(
line
[
7
]);
bean
.
setStoreHost
(
line
[
8
]);
bean
.
setBodyLength
(
Integer
.
parseInt
(
line
[
9
]));
pubContext
.
setCostTime
(
Integer
.
parseInt
(
line
[
10
]));
bean
.
setMsgType
(
MessageType
.
values
()[
Integer
.
parseInt
(
line
[
11
])]);
if
(
line
.
length
==
13
)
{
pubContext
.
setSuccess
(
Boolean
.
parseBoolean
(
line
[
12
]));
}
else
if
(
line
.
length
==
14
)
{
bean
.
setOffsetMsgId
(
line
[
12
]);
pubContext
.
setSuccess
(
Boolean
.
parseBoolean
(
line
[
13
]));
}
pubContext
.
setTraceBeans
(
new
ArrayList
<
TrackTraceBean
>(
1
));
pubContext
.
getTraceBeans
().
add
(
bean
);
resList
.
add
(
pubContext
);
}
else
if
(
line
[
0
].
equals
(
TrackTraceType
.
SubBefore
.
name
()))
{
TrackTraceContext
subBeforeContext
=
new
TrackTraceContext
();
subBeforeContext
.
setTraceType
(
TrackTraceType
.
SubBefore
);
subBeforeContext
.
setTimeStamp
(
Long
.
parseLong
(
line
[
1
]));
subBeforeContext
.
setRegionId
(
line
[
2
]);
subBeforeContext
.
setGroupName
(
line
[
3
]);
subBeforeContext
.
setRequestId
(
line
[
4
]);
TrackTraceBean
bean
=
new
TrackTraceBean
();
bean
.
setMsgId
(
line
[
5
]);
bean
.
setRetryTimes
(
Integer
.
parseInt
(
line
[
6
]));
bean
.
setKeys
(
line
[
7
]);
subBeforeContext
.
setTraceBeans
(
new
ArrayList
<
TrackTraceBean
>(
1
));
subBeforeContext
.
getTraceBeans
().
add
(
bean
);
resList
.
add
(
subBeforeContext
);
}
else
if
(
line
[
0
].
equals
(
TrackTraceType
.
SubAfter
.
name
()))
{
TrackTraceContext
subAfterContext
=
new
TrackTraceContext
();
subAfterContext
.
setTraceType
(
TrackTraceType
.
SubAfter
);
subAfterContext
.
setRequestId
(
line
[
1
]);
TrackTraceBean
bean
=
new
TrackTraceBean
();
bean
.
setMsgId
(
line
[
2
]);
bean
.
setKeys
(
line
[
5
]);
subAfterContext
.
setTraceBeans
(
new
ArrayList
<
TrackTraceBean
>(
1
));
subAfterContext
.
getTraceBeans
().
add
(
bean
);
subAfterContext
.
setCostTime
(
Integer
.
parseInt
(
line
[
3
]));
subAfterContext
.
setSuccess
(
Boolean
.
parseBoolean
(
line
[
4
]));
if
(
line
.
length
>=
7
)
{
// add the context type
subAfterContext
.
setContextCode
(
Integer
.
parseInt
(
line
[
6
]));
}
resList
.
add
(
subAfterContext
);
}
}
return
resList
;
}
/**
* Encoding the trace context into track data strings and keyset sets
*
* @param ctx
* @return
*/
public
static
TrackTraceTransferBean
encoderFromContextBean
(
TrackTraceContext
ctx
)
{
if
(
ctx
==
null
)
{
return
null
;
}
//build message track trace of the transfering entity content bean
TrackTraceTransferBean
transferBean
=
new
TrackTraceTransferBean
();
StringBuilder
sb
=
new
StringBuilder
(
256
);
switch
(
ctx
.
getTraceType
())
{
case
Pub:
{
TrackTraceBean
bean
=
ctx
.
getTraceBeans
().
get
(
0
);
//append the content of context and traceBean to transferBean's TransData
sb
.
append
(
ctx
.
getTraceType
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getTimeStamp
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getRegionId
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getGroupName
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getTopic
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getMsgId
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getTags
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getKeys
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getStoreHost
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getBodyLength
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getCostTime
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getMsgType
().
ordinal
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getOffsetMsgId
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
isSuccess
()).
append
(
TrackTraceConstants
.
FIELD_SPLITOR
);
}
break
;
case
SubBefore:
{
for
(
TrackTraceBean
bean
:
ctx
.
getTraceBeans
())
{
sb
.
append
(
ctx
.
getTraceType
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getTimeStamp
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getRegionId
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getGroupName
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getRequestId
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getMsgId
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getRetryTimes
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getKeys
()).
append
(
TrackTraceConstants
.
FIELD_SPLITOR
);
//
}
}
break
;
case
SubAfter:
{
for
(
TrackTraceBean
bean
:
ctx
.
getTraceBeans
())
{
sb
.
append
(
ctx
.
getTraceType
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getRequestId
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getMsgId
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getCostTime
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
isSuccess
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
bean
.
getKeys
()).
append
(
TrackTraceConstants
.
CONTENT_SPLITOR
)
//
.
append
(
ctx
.
getContextCode
()).
append
(
TrackTraceConstants
.
FIELD_SPLITOR
);
}
}
break
;
default
:
}
transferBean
.
setTransData
(
sb
.
toString
());
for
(
TrackTraceBean
bean
:
ctx
.
getTraceBeans
())
{
transferBean
.
getTransKey
().
add
(
bean
.
getMsgId
());
if
(
bean
.
getKeys
()
!=
null
&&
bean
.
getKeys
().
length
()
>
0
)
{
transferBean
.
getTransKey
().
add
(
bean
.
getKeys
());
}
}
return
transferBean
;
}
}
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.common
;
public
enum
TrackTraceDispatcherType
{
PRODUCER
,
CONSUMER
}
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.common
;
import
java.util.HashSet
;
import
java.util.Set
;
/**
* track trace transfering bean
*/
public
class
TrackTraceTransferBean
{
private
String
transData
;
private
Set
<
String
>
transKey
=
new
HashSet
<
String
>();
public
String
getTransData
()
{
return
transData
;
}
public
void
setTransData
(
String
transData
)
{
this
.
transData
=
transData
;
}
public
Set
<
String
>
getTransKey
()
{
return
transKey
;
}
public
void
setTransKey
(
Set
<
String
>
transKey
)
{
this
.
transKey
=
transKey
;
}
}
client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.common
;
/**
* Created by zongtanghu on 2018/11/6.
*/
public
enum
TrackTraceType
{
Pub
,
SubBefore
,
SubAfter
,
}
client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.dispatch
;
import
java.util.Properties
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
java.io.IOException
;
/**
* Interface of asynchronous transfer data
*/
public
interface
AsyncDispatcher
{
/**
* Initialize asynchronous transfer data module
*/
void
start
(
Properties
properties
)
throws
MQClientException
;
/**
* append the transfering data
* @param ctx data infomation
* @return
*/
boolean
append
(
Object
ctx
);
/**
* write flush action
*
* @throws IOException
*/
void
flush
()
throws
IOException
;
/**
* close the track trace Hook
*/
void
shutdown
();
}
client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.dispatch.impl
;
import
org.apache.rocketmq.client.common.ThreadLocalIndex
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
;
import
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
;
import
org.apache.rocketmq.client.impl.producer.TopicPublishInfo
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.MessageQueueSelector
;
import
org.apache.rocketmq.client.producer.SendCallback
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceConstants
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceContext
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceDataEncoder
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceTransferBean
;
import
org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher
;
import
org.apache.rocketmq.common.ThreadFactoryImpl
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
java.io.IOException
;
import
java.util.List
;
import
java.util.HashMap
;
import
java.util.UUID
;
import
java.util.Properties
;
import
java.util.Set
;
import
java.util.HashSet
;
import
java.util.ArrayList
;
import
java.util.Map
;
import
java.util.concurrent.ArrayBlockingQueue
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicLong
;
/**
* Created by zongtanghu on 2018/11/6.
*/
public
class
AsyncArrayDispatcher
implements
AsyncDispatcher
{
private
final
static
InternalLogger
log
=
ClientLogger
.
getLog
();
private
final
int
queueSize
;
private
final
int
batchSize
;
private
final
DefaultMQProducer
traceProducer
;
private
final
ThreadPoolExecutor
traceExecuter
;
// the last discard number of log
private
AtomicLong
discardCount
;
private
Thread
worker
;
private
ArrayBlockingQueue
<
TrackTraceContext
>
traceContextQueue
;
private
ArrayBlockingQueue
<
Runnable
>
appenderQueue
;
private
volatile
Thread
shutDownHook
;
private
volatile
boolean
stopped
=
false
;
private
String
dispatcherType
;
private
DefaultMQProducerImpl
hostProducer
;
private
DefaultMQPushConsumerImpl
hostConsumer
;
private
volatile
ThreadLocalIndex
sendWhichQueue
=
new
ThreadLocalIndex
();
private
String
dispatcherId
=
UUID
.
randomUUID
().
toString
();
public
AsyncArrayDispatcher
(
Properties
properties
)
throws
MQClientException
{
dispatcherType
=
properties
.
getProperty
(
TrackTraceConstants
.
TRACE_DISPATCHER_TYPE
);
int
queueSize
=
Integer
.
parseInt
(
properties
.
getProperty
(
TrackTraceConstants
.
ASYNC_BUFFER_SIZE
,
"2048"
));
// queueSize is greater than or equal to the n power of 2 of value
queueSize
=
1
<<
(
32
-
Integer
.
numberOfLeadingZeros
(
queueSize
-
1
));
this
.
queueSize
=
queueSize
;
batchSize
=
Integer
.
parseInt
(
properties
.
getProperty
(
TrackTraceConstants
.
MAX_BATCH_NUM
,
"1"
));
this
.
discardCount
=
new
AtomicLong
(
0L
);
traceContextQueue
=
new
ArrayBlockingQueue
<
TrackTraceContext
>(
1024
);
appenderQueue
=
new
ArrayBlockingQueue
<
Runnable
>(
queueSize
);
this
.
traceExecuter
=
new
ThreadPoolExecutor
(
//
10
,
//
20
,
//
1000
*
60
,
//
TimeUnit
.
MILLISECONDS
,
//
this
.
appenderQueue
,
//
new
ThreadFactoryImpl
(
"MQTraceSendThread_"
));
traceProducer
=
TrackTraceProducerFactory
.
getTraceDispatcherProducer
(
properties
);
}
public
DefaultMQProducer
getTraceProducer
()
{
return
traceProducer
;
}
public
DefaultMQProducerImpl
getHostProducer
()
{
return
hostProducer
;
}
public
void
setHostProducer
(
DefaultMQProducerImpl
hostProducer
)
{
this
.
hostProducer
=
hostProducer
;
}
public
DefaultMQPushConsumerImpl
getHostConsumer
()
{
return
hostConsumer
;
}
public
void
setHostConsumer
(
DefaultMQPushConsumerImpl
hostConsumer
)
{
this
.
hostConsumer
=
hostConsumer
;
}
public
void
start
(
Properties
properties
)
throws
MQClientException
{
TrackTraceProducerFactory
.
registerTraceDispatcher
(
dispatcherId
,
properties
.
getProperty
(
TrackTraceConstants
.
NAMESRV_ADDR
));
this
.
worker
=
new
Thread
(
new
AsyncRunnable
(),
"MQ-AsyncArrayDispatcher-Thread-"
+
dispatcherId
);
this
.
worker
.
setDaemon
(
true
);
this
.
worker
.
start
();
this
.
registerShutDownHook
();
}
@Override
public
boolean
append
(
final
Object
ctx
)
{
boolean
result
=
traceContextQueue
.
offer
((
TrackTraceContext
)
ctx
);
if
(!
result
)
{
log
.
info
(
"buffer full"
+
discardCount
.
incrementAndGet
()
+
" ,context is "
+
ctx
);
}
return
result
;
}
@Override
public
void
flush
()
throws
IOException
{
// the maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long
end
=
System
.
currentTimeMillis
()
+
500
;
while
(
traceContextQueue
.
size
()
>
0
||
appenderQueue
.
size
()
>
0
&&
System
.
currentTimeMillis
()
<=
end
)
{
try
{
Thread
.
sleep
(
1
);
}
catch
(
InterruptedException
e
)
{
break
;
}
}
log
.
info
(
"------end trace send "
+
traceContextQueue
.
size
()
+
" "
+
appenderQueue
.
size
());
}
@Override
public
void
shutdown
()
{
this
.
stopped
=
true
;
this
.
traceExecuter
.
shutdown
();
TrackTraceProducerFactory
.
unregisterTraceDispatcher
(
dispatcherId
);
this
.
removeShutdownHook
();
}
public
void
registerShutDownHook
()
{
if
(
shutDownHook
==
null
)
{
shutDownHook
=
new
Thread
(
new
Runnable
()
{
private
volatile
boolean
hasShutdown
=
false
;
@Override
public
void
run
()
{
synchronized
(
this
)
{
if
(!
this
.
hasShutdown
)
{
try
{
flush
();
}
catch
(
IOException
e
)
{
log
.
error
(
"system MQTrace hook shutdown failed ,maybe loss some trace data"
);
}
}
}
}
},
"ShutdownHookMQTrace"
);
Runtime
.
getRuntime
().
addShutdownHook
(
shutDownHook
);
}
}
public
void
removeShutdownHook
()
{
if
(
shutDownHook
!=
null
)
{
Runtime
.
getRuntime
().
removeShutdownHook
(
shutDownHook
);
}
}
class
AsyncRunnable
implements
Runnable
{
private
boolean
stopped
;
@Override
public
void
run
()
{
while
(!
stopped
)
{
List
<
TrackTraceContext
>
contexts
=
new
ArrayList
<
TrackTraceContext
>(
batchSize
);
for
(
int
i
=
0
;
i
<
batchSize
;
i
++)
{
TrackTraceContext
context
=
null
;
try
{
//get track trace data element from blocking Queue — traceContextQueue
context
=
traceContextQueue
.
poll
(
5
,
TimeUnit
.
MILLISECONDS
);
}
catch
(
InterruptedException
e
)
{
}
if
(
context
!=
null
)
{
contexts
.
add
(
context
);
}
else
{
break
;
}
}
if
(
contexts
.
size
()
>
0
)
{
AsyncAppenderRequest
request
=
new
AsyncAppenderRequest
(
contexts
);
traceExecuter
.
submit
(
request
);
}
else
if
(
AsyncArrayDispatcher
.
this
.
stopped
)
{
this
.
stopped
=
true
;
}
}
}
}
class
AsyncAppenderRequest
implements
Runnable
{
List
<
TrackTraceContext
>
contextList
;
public
AsyncAppenderRequest
(
final
List
<
TrackTraceContext
>
contextList
)
{
if
(
contextList
!=
null
)
{
this
.
contextList
=
contextList
;
}
else
{
this
.
contextList
=
new
ArrayList
<
TrackTraceContext
>(
1
);
}
}
@Override
public
void
run
()
{
sendTraceData
(
contextList
);
}
public
void
sendTraceData
(
List
<
TrackTraceContext
>
contextList
)
{
Map
<
String
,
List
<
TrackTraceTransferBean
>>
transBeanMap
=
new
HashMap
<
String
,
List
<
TrackTraceTransferBean
>>();
for
(
TrackTraceContext
context
:
contextList
)
{
if
(
context
.
getTraceBeans
().
isEmpty
())
{
continue
;
}
//1.topic value corresponding to original message entity content
String
topic
=
context
.
getTraceBeans
().
get
(
0
).
getTopic
();
//2.use original message entity's topic as key
String
key
=
topic
;
List
<
TrackTraceTransferBean
>
transBeanList
=
transBeanMap
.
get
(
key
);
if
(
transBeanList
==
null
)
{
transBeanList
=
new
ArrayList
<
TrackTraceTransferBean
>();
transBeanMap
.
put
(
key
,
transBeanList
);
}
TrackTraceTransferBean
traceData
=
TrackTraceDataEncoder
.
encoderFromContextBean
(
context
);
transBeanList
.
add
(
traceData
);
}
for
(
Map
.
Entry
<
String
,
List
<
TrackTraceTransferBean
>>
entry
:
transBeanMap
.
entrySet
())
{
//key -> dataTopic(Not trace Topic)
String
dataTopic
=
entry
.
getKey
();
flushData
(
entry
.
getValue
(),
dataTopic
);
}
}
/**
* batch sending data actually
*/
private
void
flushData
(
List
<
TrackTraceTransferBean
>
transBeanList
,
String
topic
)
{
if
(
transBeanList
.
size
()
==
0
)
{
return
;
}
// temporary buffer
StringBuilder
buffer
=
new
StringBuilder
(
1024
);
int
count
=
0
;
Set
<
String
>
keySet
=
new
HashSet
<
String
>();
for
(
TrackTraceTransferBean
bean
:
transBeanList
)
{
// keyset of message track trace includes msgId of or original message
keySet
.
addAll
(
bean
.
getTransKey
());
buffer
.
append
(
bean
.
getTransData
());
count
++;
// Ensure that the size of the package should not exceed the upper limit.
if
(
buffer
.
length
()
>=
traceProducer
.
getMaxMessageSize
())
{
sendTraceDataByMQ
(
keySet
,
buffer
.
toString
());
// clear temporary buffer after finishing
buffer
.
delete
(
0
,
buffer
.
length
());
keySet
.
clear
();
count
=
0
;
}
}
if
(
count
>
0
)
{
sendTraceDataByMQ
(
keySet
,
buffer
.
toString
());
}
transBeanList
.
clear
();
}
/**
* send message track trace data
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message track trace data in this batch
*/
private
void
sendTraceDataByMQ
(
Set
<
String
>
keySet
,
final
String
data
)
{
String
topic
=
TrackTraceConstants
.
TRACE_TOPIC
;
final
Message
message
=
new
Message
(
topic
,
data
.
getBytes
());
//keyset of message track trace includes msgId of or original message
message
.
setKeys
(
keySet
);
try
{
Set
<
String
>
traceBrokerSet
=
tryGetMessageQueueBrokerSet
(
traceProducer
.
getDefaultMQProducerImpl
(),
topic
);
SendCallback
callback
=
new
SendCallback
()
{
@Override
public
void
onSuccess
(
SendResult
sendResult
)
{
}
@Override
public
void
onException
(
Throwable
e
)
{
log
.
info
(
"send trace data ,the traceData is "
+
data
);
}
};
if
(
traceBrokerSet
.
isEmpty
())
{
//no cross set
traceProducer
.
send
(
message
,
callback
,
5000
);
}
else
{
traceProducer
.
send
(
message
,
new
MessageQueueSelector
()
{
@Override
public
MessageQueue
select
(
List
<
MessageQueue
>
mqs
,
Message
msg
,
Object
arg
)
{
Set
<
String
>
brokerSet
=
(
Set
<
String
>)
arg
;
List
<
MessageQueue
>
filterMqs
=
new
ArrayList
<
MessageQueue
>();
for
(
MessageQueue
queue
:
mqs
)
{
if
(
brokerSet
.
contains
(
queue
.
getBrokerName
()))
{
filterMqs
.
add
(
queue
);
}
}
int
index
=
sendWhichQueue
.
getAndIncrement
();
int
pos
=
Math
.
abs
(
index
)
%
filterMqs
.
size
();
if
(
pos
<
0
)
{
pos
=
0
;
}
return
filterMqs
.
get
(
pos
);
}
},
traceBrokerSet
,
callback
);
}
}
catch
(
Exception
e
)
{
log
.
info
(
"send trace data,the traceData is"
+
data
);
}
}
private
Set
<
String
>
tryGetMessageQueueBrokerSet
(
DefaultMQProducerImpl
producer
,
String
topic
)
{
Set
<
String
>
brokerSet
=
new
HashSet
<
String
>();
TopicPublishInfo
topicPublishInfo
=
producer
.
getTopicPublishInfoTable
().
get
(
topic
);
if
(
null
==
topicPublishInfo
||
!
topicPublishInfo
.
ok
())
{
producer
.
getTopicPublishInfoTable
().
putIfAbsent
(
topic
,
new
TopicPublishInfo
());
producer
.
getmQClientFactory
().
updateTopicRouteInfoFromNameServer
(
topic
);
topicPublishInfo
=
producer
.
getTopicPublishInfoTable
().
get
(
topic
);
}
if
(
topicPublishInfo
.
isHaveTopicRouterInfo
()
||
topicPublishInfo
.
ok
())
{
for
(
MessageQueue
queue
:
topicPublishInfo
.
getMessageQueueList
())
{
brokerSet
.
add
(
queue
.
getBrokerName
());
}
}
return
brokerSet
;
}
}
}
client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.dispatch.impl
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceConstants
;
import
org.apache.rocketmq.common.namesrv.TopAddressing
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.atomic.AtomicBoolean
;
public
class
TrackTraceProducerFactory
{
private
static
Map
<
String
,
Object
>
dispatcherTable
=
new
ConcurrentHashMap
<
String
,
Object
>();
private
static
AtomicBoolean
isStarted
=
new
AtomicBoolean
(
false
);
private
static
DefaultMQProducer
traceProducer
;
public
static
DefaultMQProducer
getTraceDispatcherProducer
(
Properties
properties
)
{
if
(
traceProducer
==
null
)
{
traceProducer
=
new
DefaultMQProducer
();
traceProducer
.
setProducerGroup
(
TrackTraceConstants
.
GROUP_NAME
);
traceProducer
.
setSendMsgTimeout
(
5000
);
traceProducer
.
setInstanceName
(
properties
.
getProperty
(
TrackTraceConstants
.
INSTANCE_NAME
,
String
.
valueOf
(
System
.
currentTimeMillis
())));
String
nameSrv
=
properties
.
getProperty
(
TrackTraceConstants
.
NAMESRV_ADDR
);
if
(
nameSrv
==
null
)
{
TopAddressing
topAddressing
=
new
TopAddressing
(
properties
.
getProperty
(
TrackTraceConstants
.
ADDRSRV_URL
));
nameSrv
=
topAddressing
.
fetchNSAddr
();
}
traceProducer
.
setNamesrvAddr
(
nameSrv
);
traceProducer
.
setVipChannelEnabled
(
false
);
//the max size of message is 128K
int
maxSize
=
Integer
.
parseInt
(
properties
.
getProperty
(
TrackTraceConstants
.
MAX_MSG_SIZE
,
"128000"
));
traceProducer
.
setMaxMessageSize
(
maxSize
-
10
*
1000
);
}
return
traceProducer
;
}
public
static
void
registerTraceDispatcher
(
String
dispatcherId
,
String
nameSrvAddr
)
throws
MQClientException
{
dispatcherTable
.
put
(
dispatcherId
,
new
Object
());
if
(
traceProducer
!=
null
&&
isStarted
.
compareAndSet
(
false
,
true
))
{
traceProducer
.
setNamesrvAddr
(
nameSrvAddr
);
traceProducer
.
start
();
}
}
public
static
void
unregisterTraceDispatcher
(
String
dispatcherId
)
{
dispatcherTable
.
remove
(
dispatcherId
);
if
(
dispatcherTable
.
isEmpty
()
&&
traceProducer
!=
null
&&
isStarted
.
get
())
{
traceProducer
.
shutdown
();
}
}
}
client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.hook
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeReturnType
;
import
org.apache.rocketmq.client.hook.ConsumeMessageContext
;
import
org.apache.rocketmq.client.hook.ConsumeMessageHook
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceBean
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceContext
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceType
;
import
org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
java.util.ArrayList
;
import
java.util.List
;
public
class
ConsumeMessageTraceHookImpl
implements
ConsumeMessageHook
{
private
AsyncDispatcher
localDispatcher
;
public
ConsumeMessageTraceHookImpl
(
AsyncDispatcher
localDispatcher
)
{
this
.
localDispatcher
=
localDispatcher
;
}
@Override
public
String
hookName
()
{
return
"ConsumeMessageTraceHook"
;
}
@Override
public
void
consumeMessageBefore
(
ConsumeMessageContext
context
)
{
if
(
context
==
null
||
context
.
getMsgList
()
==
null
||
context
.
getMsgList
().
isEmpty
())
{
return
;
}
TrackTraceContext
traceContext
=
new
TrackTraceContext
();
context
.
setMqTraceContext
(
traceContext
);
traceContext
.
setTraceType
(
TrackTraceType
.
SubBefore
);
//
traceContext
.
setGroupName
(
context
.
getConsumerGroup
());
//
List
<
TrackTraceBean
>
beans
=
new
ArrayList
<
TrackTraceBean
>();
for
(
MessageExt
msg
:
context
.
getMsgList
())
{
if
(
msg
==
null
)
{
continue
;
}
String
regionId
=
msg
.
getProperty
(
MessageConst
.
PROPERTY_MSG_REGION
);
String
traceOn
=
msg
.
getProperty
(
MessageConst
.
PROPERTY_TRACE_SWITCH
);
if
(
traceOn
!=
null
&&
traceOn
.
equals
(
"false"
))
{
// if trace switch is false ,skip it
continue
;
}
TrackTraceBean
traceBean
=
new
TrackTraceBean
();
traceBean
.
setTopic
(
msg
.
getTopic
());
//
traceBean
.
setMsgId
(
msg
.
getMsgId
());
//
traceBean
.
setTags
(
msg
.
getTags
());
//
traceBean
.
setKeys
(
msg
.
getKeys
());
//
traceBean
.
setStoreTime
(
msg
.
getStoreTimestamp
());
//
traceBean
.
setBodyLength
(
msg
.
getStoreSize
());
//
traceBean
.
setRetryTimes
(
msg
.
getReconsumeTimes
());
//
traceContext
.
setRegionId
(
regionId
);
//
beans
.
add
(
traceBean
);
}
if
(
beans
.
size
()
>
0
)
{
traceContext
.
setTraceBeans
(
beans
);
traceContext
.
setTimeStamp
(
System
.
currentTimeMillis
());
localDispatcher
.
append
(
traceContext
);
}
}
@Override
public
void
consumeMessageAfter
(
ConsumeMessageContext
context
)
{
if
(
context
==
null
||
context
.
getMsgList
()
==
null
||
context
.
getMsgList
().
isEmpty
())
{
return
;
}
TrackTraceContext
subBeforeContext
=
(
TrackTraceContext
)
context
.
getMqTraceContext
();
if
(
subBeforeContext
.
getTraceBeans
()
==
null
||
subBeforeContext
.
getTraceBeans
().
size
()
<
1
)
{
// if subbefore bean is null ,skip it
return
;
}
TrackTraceContext
subAfterContext
=
new
TrackTraceContext
();
subAfterContext
.
setTraceType
(
TrackTraceType
.
SubAfter
);
//
subAfterContext
.
setRegionId
(
subBeforeContext
.
getRegionId
());
//
subAfterContext
.
setGroupName
(
subBeforeContext
.
getGroupName
());
//
subAfterContext
.
setRequestId
(
subBeforeContext
.
getRequestId
());
//
subAfterContext
.
setSuccess
(
context
.
isSuccess
());
//
//caculate the cost time for processing messages
int
costTime
=
(
int
)
((
System
.
currentTimeMillis
()
-
subBeforeContext
.
getTimeStamp
())
/
context
.
getMsgList
().
size
());
subAfterContext
.
setCostTime
(
costTime
);
//
subAfterContext
.
setTraceBeans
(
subBeforeContext
.
getTraceBeans
());
String
contextType
=
context
.
getProps
().
get
(
MixAll
.
CONSUME_CONTEXT_TYPE
);
if
(
contextType
!=
null
)
{
subAfterContext
.
setContextCode
(
ConsumeReturnType
.
valueOf
(
contextType
).
ordinal
());
}
localDispatcher
.
append
(
subAfterContext
);
}
}
client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace.core.hook
;
import
org.apache.rocketmq.client.hook.SendMessageContext
;
import
org.apache.rocketmq.client.hook.SendMessageHook
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceBean
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceConstants
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceContext
;
import
org.apache.rocketmq.client.trace.core.common.TrackTraceType
;
import
org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher
;
import
org.apache.rocketmq.common.MixAll
;
import
java.util.ArrayList
;
public
class
SendMessageTrackHookImpl
implements
SendMessageHook
{
private
AsyncDispatcher
localDispatcher
;
public
SendMessageTrackHookImpl
(
AsyncDispatcher
localDispatcher
)
{
this
.
localDispatcher
=
localDispatcher
;
}
@Override
public
String
hookName
()
{
return
"SendMessageTrackHook"
;
}
@Override
public
void
sendMessageBefore
(
SendMessageContext
context
)
{
//if it is message track trace data,then it doesn't recorded
if
(
context
==
null
||
context
.
getMessage
().
getTopic
().
startsWith
(
MixAll
.
SYSTEM_TOPIC_PREFIX
))
{
return
;
}
//build the context content of TuxeTraceContext
TrackTraceContext
tuxeContext
=
new
TrackTraceContext
();
tuxeContext
.
setTraceBeans
(
new
ArrayList
<
TrackTraceBean
>(
1
));
context
.
setMqTraceContext
(
tuxeContext
);
tuxeContext
.
setTraceType
(
TrackTraceType
.
Pub
);
tuxeContext
.
setGroupName
(
context
.
getProducerGroup
());
//build the data bean object of message track trace
TrackTraceBean
traceBean
=
new
TrackTraceBean
();
traceBean
.
setTopic
(
context
.
getMessage
().
getTopic
());
traceBean
.
setTags
(
context
.
getMessage
().
getTags
());
traceBean
.
setKeys
(
context
.
getMessage
().
getKeys
());
traceBean
.
setStoreHost
(
context
.
getBrokerAddr
());
traceBean
.
setBodyLength
(
context
.
getMessage
().
getBody
().
length
);
traceBean
.
setMsgType
(
context
.
getMsgType
());
tuxeContext
.
getTraceBeans
().
add
(
traceBean
);
}
@Override
public
void
sendMessageAfter
(
SendMessageContext
context
)
{
//if it is message track trace data,then it doesn't recorded
if
(
context
==
null
||
context
.
getMessage
().
getTopic
().
startsWith
(
TrackTraceConstants
.
TRACE_TOPIC
)
||
context
.
getMqTraceContext
()
==
null
)
{
return
;
}
if
(
context
.
getSendResult
()
==
null
)
{
return
;
}
if
(
context
.
getSendResult
().
getRegionId
()
==
null
||
!
context
.
getSendResult
().
isTraceOn
())
{
// if switch is false,skip it
return
;
}
TrackTraceContext
tuxeContext
=
(
TrackTraceContext
)
context
.
getMqTraceContext
();
TrackTraceBean
traceBean
=
tuxeContext
.
getTraceBeans
().
get
(
0
);
int
costTime
=
(
int
)
((
System
.
currentTimeMillis
()
-
tuxeContext
.
getTimeStamp
())
/
tuxeContext
.
getTraceBeans
().
size
());
tuxeContext
.
setCostTime
(
costTime
);
if
(
context
.
getSendResult
().
getSendStatus
().
equals
(
SendStatus
.
SEND_OK
))
{
tuxeContext
.
setSuccess
(
true
);
}
else
{
tuxeContext
.
setSuccess
(
false
);
}
tuxeContext
.
setRegionId
(
context
.
getSendResult
().
getRegionId
());
traceBean
.
setMsgId
(
context
.
getSendResult
().
getMsgId
());
traceBean
.
setOffsetMsgId
(
context
.
getSendResult
().
getOffsetMsgId
());
traceBean
.
setStoreTime
(
tuxeContext
.
getTimeStamp
()
+
costTime
/
2
);
localDispatcher
.
append
(
tuxeContext
);
}
}
client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
浏览文件 @
e04db85e
...
...
@@ -89,7 +89,7 @@ public class DefaultMQPushConsumerTest {
@Before
public
void
init
()
throws
Exception
{
consumerGroup
=
"FooBarGroup"
+
System
.
currentTimeMillis
();
pushConsumer
=
new
DefaultMQPushConsumer
(
consumerGroup
);
pushConsumer
=
new
DefaultMQPushConsumer
(
consumerGroup
,
false
);
pushConsumer
.
setNamesrvAddr
(
"127.0.0.1:9876"
);
pushConsumer
.
setPullInterval
(
60
*
1000
);
...
...
@@ -252,7 +252,7 @@ public class DefaultMQPushConsumerTest {
}
private
DefaultMQPushConsumer
createPushConsumer
()
{
DefaultMQPushConsumer
pushConsumer
=
new
DefaultMQPushConsumer
(
consumerGroup
);
DefaultMQPushConsumer
pushConsumer
=
new
DefaultMQPushConsumer
(
consumerGroup
,
false
);
pushConsumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
@Override
public
ConsumeConcurrentlyStatus
consumeMessage
(
List
<
MessageExt
>
msgs
,
...
...
client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
浏览文件 @
e04db85e
...
...
@@ -42,7 +42,7 @@ public class DefaultMQPushConsumerImplTest {
//test message
thrown
.
expectMessage
(
"consumeThreadMin (10) is larger than consumeThreadMax (9)"
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"test_consumer_group"
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"test_consumer_group"
,
false
);
consumer
.
setConsumeThreadMin
(
10
);
consumer
.
setConsumeThreadMax
(
9
);
...
...
client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
浏览文件 @
e04db85e
...
...
@@ -44,7 +44,7 @@ import static org.mockito.Mockito.when;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
RebalancePushImplTest
{
@Spy
private
DefaultMQPushConsumerImpl
defaultMQPushConsumer
=
new
DefaultMQPushConsumerImpl
(
new
DefaultMQPushConsumer
(
"RebalancePushImplTest"
),
null
);
private
DefaultMQPushConsumerImpl
defaultMQPushConsumer
=
new
DefaultMQPushConsumerImpl
(
new
DefaultMQPushConsumer
(
"RebalancePushImplTest"
,
false
),
null
);
@Mock
private
MQClientInstance
mqClientInstance
;
@Mock
...
...
client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
浏览文件 @
e04db85e
...
...
@@ -82,7 +82,7 @@ public class DefaultMQProducerTest {
@Before
public
void
init
()
throws
Exception
{
String
producerGroupTemp
=
producerGroupPrefix
+
System
.
currentTimeMillis
();
producer
=
new
DefaultMQProducer
(
producerGroupTemp
);
producer
=
new
DefaultMQProducer
(
producerGroupTemp
,
false
);
producer
.
setNamesrvAddr
(
"127.0.0.1:9876"
);
producer
.
setCompressMsgBodyOverHowmuch
(
16
);
message
=
new
Message
(
topic
,
new
byte
[]
{
'a'
});
...
...
@@ -309,7 +309,7 @@ public class DefaultMQProducerTest {
@Test
public
void
testSetCallbackExecutor
()
throws
MQClientException
{
String
producerGroupTemp
=
"testSetCallbackExecutor_"
+
System
.
currentTimeMillis
();
producer
=
new
DefaultMQProducer
(
producerGroupTemp
);
producer
=
new
DefaultMQProducer
(
producerGroupTemp
,
false
);
producer
.
setNamesrvAddr
(
"127.0.0.1:9876"
);
producer
.
start
();
...
...
client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
0 → 100644
浏览文件 @
e04db85e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.trace
;
import
java.lang.reflect.Field
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.TimeUnit
;
import
net.bytebuddy.asm.Advice
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.hook.SendMessageContext
;
import
org.apache.rocketmq.client.impl.CommunicationMode
;
import
org.apache.rocketmq.client.impl.MQClientAPIImpl
;
import
org.apache.rocketmq.client.impl.MQClientManager
;
import
org.apache.rocketmq.client.impl.factory.MQClientInstance
;
import
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
;
import
org.apache.rocketmq.client.impl.producer.TopicPublishInfo
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.SendCallback
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.route.BrokerData
;
import
org.apache.rocketmq.common.protocol.route.QueueData
;
import
org.apache.rocketmq.common.protocol.route.TopicRouteData
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
import
org.apache.rocketmq.remoting.netty.NettyRemotingClient
;
import
org.junit.After
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyInt
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyObject
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyString
;
import
static
org
.
mockito
.
ArgumentMatchers
.
nullable
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
DefaultMQProducerWithTraceTest
{
@Spy
private
MQClientInstance
mQClientFactory
=
MQClientManager
.
getInstance
().
getAndCreateMQClientInstance
(
new
ClientConfig
());
@Mock
private
MQClientAPIImpl
mQClientAPIImpl
;
@Spy
private
MQClientInstance
mQClientTraceFactory
=
MQClientManager
.
getInstance
().
getAndCreateMQClientInstance
(
new
ClientConfig
());
@Mock
private
MQClientAPIImpl
mQClientTraceAPIImpl
;
private
AsyncArrayDispatcher
asyncArrayDispatcher
;
private
DefaultMQProducer
producer
;
private
DefaultMQProducer
traceProducer
;
private
Message
message
;
private
String
topic
=
"FooBar"
;
private
String
producerGroupPrefix
=
"FooBar_PID"
;
private
String
producerGroupTemp
=
producerGroupPrefix
+
System
.
currentTimeMillis
();
private
String
producerGroupTraceTemp
=
MixAll
.
RMQ_SYS_TRACK_TRACE_TOPIC
+
System
.
currentTimeMillis
();
@Before
public
void
init
()
throws
Exception
{
producer
=
new
DefaultMQProducer
(
producerGroupTemp
,
true
);
producer
.
setNamesrvAddr
(
"127.0.0.1:9876"
);
message
=
new
Message
(
topic
,
new
byte
[]
{
'a'
,
'b'
,
'c'
});
asyncArrayDispatcher
=
(
AsyncArrayDispatcher
)
producer
.
getTraceDispatcher
();
traceProducer
=
asyncArrayDispatcher
.
getTraceProducer
();
producer
.
start
();
Field
field
=
DefaultMQProducerImpl
.
class
.
getDeclaredField
(
"mQClientFactory"
);
field
.
setAccessible
(
true
);
field
.
set
(
producer
.
getDefaultMQProducerImpl
(),
mQClientFactory
);
Field
fieldTrace
=
DefaultMQProducerImpl
.
class
.
getDeclaredField
(
"mQClientFactory"
);
fieldTrace
.
setAccessible
(
true
);
field
.
set
(
traceProducer
.
getDefaultMQProducerImpl
(),
mQClientTraceFactory
);
field
=
MQClientInstance
.
class
.
getDeclaredField
(
"mQClientAPIImpl"
);
field
.
setAccessible
(
true
);
field
.
set
(
mQClientFactory
,
mQClientAPIImpl
);
field
=
MQClientInstance
.
class
.
getDeclaredField
(
"mQClientAPIImpl"
);
field
.
setAccessible
(
true
);
field
.
set
(
mQClientTraceFactory
,
mQClientTraceAPIImpl
);
producer
.
getDefaultMQProducerImpl
().
getmQClientFactory
().
registerProducer
(
producerGroupTemp
,
producer
.
getDefaultMQProducerImpl
());
when
(
mQClientAPIImpl
.
sendMessage
(
anyString
(),
anyString
(),
any
(
Message
.
class
),
any
(
SendMessageRequestHeader
.
class
),
anyLong
(),
any
(
CommunicationMode
.
class
),
nullable
(
SendMessageContext
.
class
),
any
(
DefaultMQProducerImpl
.
class
))).
thenCallRealMethod
();
when
(
mQClientAPIImpl
.
sendMessage
(
anyString
(),
anyString
(),
any
(
Message
.
class
),
any
(
SendMessageRequestHeader
.
class
),
anyLong
(),
any
(
CommunicationMode
.
class
),
nullable
(
SendCallback
.
class
),
nullable
(
TopicPublishInfo
.
class
),
nullable
(
MQClientInstance
.
class
),
anyInt
(),
nullable
(
SendMessageContext
.
class
),
any
(
DefaultMQProducerImpl
.
class
)))
.
thenReturn
(
createSendResult
(
SendStatus
.
SEND_OK
));
}
@Test
public
void
testSendMessageSync_WithTrace_Success
()
throws
RemotingException
,
InterruptedException
,
MQBrokerException
,
MQClientException
{
traceProducer
.
getDefaultMQProducerImpl
().
getmQClientFactory
().
registerProducer
(
producerGroupTraceTemp
,
traceProducer
.
getDefaultMQProducerImpl
());
when
(
mQClientAPIImpl
.
getTopicRouteInfoFromNameServer
(
anyString
(),
anyLong
())).
thenReturn
(
createTopicRoute
());
when
(
mQClientTraceAPIImpl
.
getTopicRouteInfoFromNameServer
(
anyString
(),
anyLong
())).
thenReturn
(
createTraceTopicRoute
());
final
CountDownLatch
countDownLatch
=
new
CountDownLatch
(
1
);
try
{
producer
.
send
(
message
);
}
catch
(
MQClientException
e
){
}
countDownLatch
.
await
(
3000L
,
TimeUnit
.
MILLISECONDS
);
}
@Test
public
void
testSendMessageSync_WithTrace_NoBrokerSet_Exception
()
throws
RemotingException
,
InterruptedException
,
MQBrokerException
,
MQClientException
{
when
(
mQClientAPIImpl
.
getTopicRouteInfoFromNameServer
(
anyString
(),
anyLong
())).
thenReturn
(
createTopicRoute
());
when
(
mQClientTraceAPIImpl
.
getTopicRouteInfoFromNameServer
(
anyString
(),
anyLong
())).
thenReturn
(
createTraceTopicRoute
());
final
CountDownLatch
countDownLatch
=
new
CountDownLatch
(
1
);
try
{
producer
.
send
(
message
);
}
catch
(
MQClientException
e
){
}
countDownLatch
.
await
(
3000L
,
TimeUnit
.
MILLISECONDS
);
}
@After
public
void
terminate
()
{
producer
.
shutdown
();
}
public
static
TopicRouteData
createTopicRoute
()
{
TopicRouteData
topicRouteData
=
new
TopicRouteData
();
topicRouteData
.
setFilterServerTable
(
new
HashMap
<
String
,
List
<
String
>>());
List
<
BrokerData
>
brokerDataList
=
new
ArrayList
<
BrokerData
>();
BrokerData
brokerData
=
new
BrokerData
();
brokerData
.
setBrokerName
(
"BrokerA"
);
brokerData
.
setCluster
(
"DefaultCluster"
);
HashMap
<
Long
,
String
>
brokerAddrs
=
new
HashMap
<
Long
,
String
>();
brokerAddrs
.
put
(
0L
,
"127.0.0.1:10911"
);
brokerData
.
setBrokerAddrs
(
brokerAddrs
);
brokerDataList
.
add
(
brokerData
);
topicRouteData
.
setBrokerDatas
(
brokerDataList
);
List
<
QueueData
>
queueDataList
=
new
ArrayList
<
QueueData
>();
QueueData
queueData
=
new
QueueData
();
queueData
.
setBrokerName
(
"BrokerA"
);
queueData
.
setPerm
(
6
);
queueData
.
setReadQueueNums
(
3
);
queueData
.
setWriteQueueNums
(
4
);
queueData
.
setTopicSynFlag
(
0
);
queueDataList
.
add
(
queueData
);
topicRouteData
.
setQueueDatas
(
queueDataList
);
return
topicRouteData
;
}
private
SendResult
createSendResult
(
SendStatus
sendStatus
)
{
SendResult
sendResult
=
new
SendResult
();
sendResult
.
setMsgId
(
"123"
);
sendResult
.
setOffsetMsgId
(
"123"
);
sendResult
.
setQueueOffset
(
456
);
sendResult
.
setSendStatus
(
sendStatus
);
sendResult
.
setRegionId
(
"HZ"
);
return
sendResult
;
}
public
static
TopicRouteData
createTraceTopicRoute
()
{
TopicRouteData
topicRouteData
=
new
TopicRouteData
();
topicRouteData
.
setFilterServerTable
(
new
HashMap
<
String
,
List
<
String
>>());
List
<
BrokerData
>
brokerDataList
=
new
ArrayList
<
BrokerData
>();
BrokerData
brokerData
=
new
BrokerData
();
brokerData
.
setBrokerName
(
"broker-trace"
);
brokerData
.
setCluster
(
"DefaultCluster"
);
HashMap
<
Long
,
String
>
brokerAddrs
=
new
HashMap
<
Long
,
String
>();
brokerAddrs
.
put
(
0L
,
"127.0.0.1:10912"
);
brokerData
.
setBrokerAddrs
(
brokerAddrs
);
brokerDataList
.
add
(
brokerData
);
topicRouteData
.
setBrokerDatas
(
brokerDataList
);
List
<
QueueData
>
queueDataList
=
new
ArrayList
<
QueueData
>();
QueueData
queueData
=
new
QueueData
();
queueData
.
setBrokerName
(
"broker-trace"
);
queueData
.
setPerm
(
6
);
queueData
.
setReadQueueNums
(
1
);
queueData
.
setWriteQueueNums
(
1
);
queueData
.
setTopicSynFlag
(
1
);
queueDataList
.
add
(
queueData
);
topicRouteData
.
setQueueDatas
(
queueDataList
);
return
topicRouteData
;
}
private
SendResult
createSendTraceResult
(
SendStatus
sendStatus
)
{
SendResult
sendResult
=
new
SendResult
();
sendResult
.
setMsgId
(
"456"
);
sendResult
.
setOffsetMsgId
(
"456"
);
sendResult
.
setQueueOffset
(
789
);
sendResult
.
setSendStatus
(
sendStatus
);
sendResult
.
setRegionId
(
"HZ"
);
return
sendResult
;
}
}
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
浏览文件 @
e04db85e
...
...
@@ -51,7 +51,8 @@ public class BrokerConfig {
@ImportantField
private
boolean
autoCreateSubscriptionGroup
=
true
;
private
String
messageStorePlugIn
=
""
;
@ImportantField
private
boolean
autoTraceBrokerEnable
=
false
;
/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
* value is 1.
...
...
@@ -732,4 +733,12 @@ public class BrokerConfig {
public
void
setWaitTimeMillsInTransactionQueue
(
long
waitTimeMillsInTransactionQueue
)
{
this
.
waitTimeMillsInTransactionQueue
=
waitTimeMillsInTransactionQueue
;
}
public
boolean
isAutoTraceBrokerEnable
()
{
return
autoTraceBrokerEnable
;
}
public
void
setAutoTraceBrokerEnable
(
boolean
autoTraceBrokerEnable
)
{
this
.
autoTraceBrokerEnable
=
autoTraceBrokerEnable
;
}
}
common/src/main/java/org/apache/rocketmq/common/MixAll.java
浏览文件 @
e04db85e
...
...
@@ -82,6 +82,7 @@ public class MixAll {
public
static
final
long
CURRENT_JVM_PID
=
getPID
();
public
static
final
String
RETRY_GROUP_TOPIC_PREFIX
=
"%RETRY%"
;
public
static
final
String
TRACE_BROKER_NAME_SUFFIX
=
"trace"
;
public
static
final
String
DLQ_GROUP_TOPIC_PREFIX
=
"%DLQ%"
;
public
static
final
String
SYSTEM_TOPIC_PREFIX
=
"rmq_sys_"
;
...
...
@@ -90,6 +91,7 @@ public class MixAll {
public
static
final
String
CONSUME_CONTEXT_TYPE
=
"ConsumeContextType"
;
public
static
final
String
RMQ_SYS_TRANS_HALF_TOPIC
=
"RMQ_SYS_TRANS_HALF_TOPIC"
;
public
static
final
String
RMQ_SYS_TRACK_TRACE_TOPIC
=
"RMQ_SYS_TRACK_TRACE_TOPIC"
;
public
static
final
String
RMQ_SYS_TRANS_OP_HALF_TOPIC
=
"RMQ_SYS_TRANS_OP_HALF_TOPIC"
;
public
static
final
String
CID_SYS_RMQ_TRANS
=
"CID_RMQ_SYS_TRANS"
;
...
...
distribution/conf/2m-noslave/broker-a.properties
浏览文件 @
e04db85e
...
...
@@ -19,3 +19,4 @@ deleteWhen=04
fileReservedTime
=
48
brokerRole
=
ASYNC_MASTER
flushDiskType
=
ASYNC_FLUSH
autoTraceBrokerEnable
=
false
\ No newline at end of file
distribution/conf/2m-noslave/broker-b.properties
浏览文件 @
e04db85e
...
...
@@ -19,3 +19,4 @@ deleteWhen=04
fileReservedTime
=
48
brokerRole
=
ASYNC_MASTER
flushDiskType
=
ASYNC_FLUSH
autoTraceBrokerEnable
=
false
\ No newline at end of file
distribution/conf/2m-noslave/broker-trace.properties
0 → 100644
浏览文件 @
e04db85e
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName
=
DefaultCluster
brokerName
=
broker-trace
brokerId
=
0
deleteWhen
=
04
fileReservedTime
=
48
brokerRole
=
ASYNC_MASTER
flushDiskType
=
ASYNC_FLUSH
autoTraceBrokerEnable
=
true
\ No newline at end of file
example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
浏览文件 @
e04db85e
...
...
@@ -25,7 +25,7 @@ import org.apache.rocketmq.common.message.Message;
public
class
SimpleBatchProducer
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"BatchProducerGroupName"
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"BatchProducerGroupName"
,
false
);
producer
.
start
();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
...
...
example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
浏览文件 @
e04db85e
...
...
@@ -28,7 +28,7 @@ public class SplitBatchProducer {
public
static
void
main
(
String
[]
args
)
throws
Exception
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"BatchProducerGroupName"
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"BatchProducerGroupName"
,
false
);
producer
.
start
();
//large batch
...
...
example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
浏览文件 @
e04db85e
...
...
@@ -102,7 +102,7 @@ public class Consumer {
}
},
10000
,
10000
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
group
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
group
,
false
);
consumer
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
if
(
filterType
==
null
||
expression
==
null
)
{
...
...
example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
浏览文件 @
e04db85e
...
...
@@ -100,7 +100,7 @@ public class Producer {
}
},
10000
,
10000
);
final
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"benchmark_producer"
);
final
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"benchmark_producer"
,
false
);
producer
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
if
(
commandLine
.
hasOption
(
'n'
))
{
...
...
example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
浏览文件 @
e04db85e
...
...
@@ -29,7 +29,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
public
class
PushConsumer
{
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
,
MQClientException
{
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_1"
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_1"
,
false
);
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_FIRST_OFFSET
);
...
...
example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
浏览文件 @
e04db85e
...
...
@@ -30,7 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt;
public
class
Consumer
{
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
,
MQClientException
,
IOException
{
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"ConsumerGroupNamecc4"
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"ConsumerGroupNamecc4"
,
false
);
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
File
classFile
=
new
File
(
classLoader
.
getResource
(
"MessageFilterImpl.java"
).
getFile
());
...
...
example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
浏览文件 @
e04db85e
...
...
@@ -24,7 +24,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
public
class
Producer
{
public
static
void
main
(
String
[]
args
)
throws
MQClientException
,
InterruptedException
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"ProducerGroupName"
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"ProducerGroupName"
,
false
);
producer
.
start
();
try
{
...
...
example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
浏览文件 @
e04db85e
...
...
@@ -30,7 +30,7 @@ import java.util.List;
public
class
SqlConsumer
{
public
static
void
main
(
String
[]
args
)
{
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_4"
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_4"
,
false
);
try
{
consumer
.
subscribe
(
"TopicTest"
,
MessageSelector
.
bySql
(
"(TAGS is not null and TAGS in ('TagA', 'TagB'))"
+
...
...
example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
浏览文件 @
e04db85e
...
...
@@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
public
class
SqlProducer
{
public
static
void
main
(
String
[]
args
)
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"please_rename_unique_group_name"
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"please_rename_unique_group_name"
,
false
);
try
{
producer
.
start
();
}
catch
(
MQClientException
e
)
{
...
...
example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
浏览文件 @
e04db85e
...
...
@@ -41,7 +41,7 @@ public class Consumer {
String
subscription
=
commandLine
.
getOptionValue
(
's'
);
final
String
returnFailedHalf
=
commandLine
.
getOptionValue
(
'f'
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
group
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
group
,
false
);
consumer
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
consumer
.
subscribe
(
topic
,
subscription
);
...
...
example/src/main/java/org/apache/rocketmq/example/operation/Producer.java
浏览文件 @
e04db85e
...
...
@@ -39,7 +39,7 @@ public class Producer {
String
keys
=
commandLine
.
getOptionValue
(
'k'
);
String
msgCount
=
commandLine
.
getOptionValue
(
'c'
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
group
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
group
,
false
);
producer
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
producer
.
start
();
...
...
example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
浏览文件 @
e04db85e
...
...
@@ -29,7 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt;
public
class
Consumer
{
public
static
void
main
(
String
[]
args
)
throws
MQClientException
{
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_3"
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_3"
,
false
);
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_FIRST_OFFSET
);
...
...
example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java
浏览文件 @
e04db85e
...
...
@@ -32,7 +32,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
public
class
Producer
{
public
static
void
main
(
String
[]
args
)
throws
UnsupportedEncodingException
{
try
{
MQProducer
producer
=
new
DefaultMQProducer
(
"please_rename_unique_group_name"
);
MQProducer
producer
=
new
DefaultMQProducer
(
"please_rename_unique_group_name"
,
false
);
producer
.
start
();
String
[]
tags
=
new
String
[]
{
"TagA"
,
"TagB"
,
"TagC"
,
"TagD"
,
"TagE"
};
...
...
example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
浏览文件 @
e04db85e
...
...
@@ -35,7 +35,7 @@ public class Consumer {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_4"
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_4"
,
false
);
/*
* Specify name server addresses.
...
...
example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
浏览文件 @
e04db85e
...
...
@@ -31,7 +31,7 @@ public class Producer {
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"please_rename_unique_group_name"
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"please_rename_unique_group_name"
,
false
);
/*
* Specify name server addresses.
...
...
example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
浏览文件 @
e04db85e
...
...
@@ -30,7 +30,7 @@ public class AsyncProducer {
public
static
void
main
(
String
[]
args
)
throws
MQClientException
,
InterruptedException
,
UnsupportedEncodingException
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"Jodie_Daily_test"
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"Jodie_Daily_test"
,
false
);
producer
.
start
();
producer
.
setRetryTimesWhenSendAsyncFailed
(
0
);
...
...
example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
浏览文件 @
e04db85e
...
...
@@ -25,8 +25,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
public
class
Producer
{
public
static
void
main
(
String
[]
args
)
throws
MQClientException
,
InterruptedException
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"ProducerGroupName"
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"ProducerGroupName"
,
true
);
producer
.
start
();
for
(
int
i
=
0
;
i
<
128
;
i
++)
...
...
example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
浏览文件 @
e04db85e
...
...
@@ -28,11 +28,11 @@ import org.apache.rocketmq.common.message.MessageExt;
public
class
PushConsumer
{
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
,
MQClientException
{
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"CID_JODIE_1"
);
consumer
.
subscribe
(
"
Jodie_topic_1023
"
,
"*"
);
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"CID_JODIE_1"
,
true
);
consumer
.
subscribe
(
"
TopicTest
"
,
"*"
);
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_FIRST_OFFSET
);
//wrong time format 2017_0422_221800
consumer
.
setConsumeTimestamp
(
"201
70422
221800"
);
consumer
.
setConsumeTimestamp
(
"201
81109
221800"
);
consumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
@Override
...
...
example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
浏览文件 @
e04db85e
...
...
@@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
public
class
TestProducer
{
public
static
void
main
(
String
[]
args
)
throws
MQClientException
,
InterruptedException
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"ProducerGroupName"
);
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"ProducerGroupName"
,
false
);
producer
.
start
();
for
(
int
i
=
0
;
i
<
1
;
i
++)
...
...
logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
浏览文件 @
e04db85e
...
...
@@ -63,7 +63,7 @@ public class ProducerInstance {
return
p
;
}
DefaultMQProducer
defaultMQProducer
=
new
DefaultMQProducer
(
group
);
DefaultMQProducer
defaultMQProducer
=
new
DefaultMQProducer
(
group
,
false
);
defaultMQProducer
.
setNamesrvAddr
(
nameServerAddress
);
MQProducer
beforeProducer
=
null
;
beforeProducer
=
getProducerInstance
().
producerMap
.
putIfAbsent
(
genKey
,
defaultMQProducer
);
...
...
logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
浏览文件 @
e04db85e
...
...
@@ -39,7 +39,7 @@ public class AbstractTestCase {
@Before
public
void
mockLoggerAppender
()
throws
Exception
{
DefaultMQProducer
defaultMQProducer
=
spy
(
new
DefaultMQProducer
(
"loggerAppender"
));
DefaultMQProducer
defaultMQProducer
=
spy
(
new
DefaultMQProducer
(
"loggerAppender"
,
false
));
doAnswer
(
new
Answer
<
Void
>()
{
@Override
public
Void
answer
(
InvocationOnMock
invocationOnMock
)
throws
Throwable
{
...
...
test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
浏览文件 @
e04db85e
...
...
@@ -46,7 +46,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
}
public
void
create
(
boolean
useTLS
)
{
consumer
=
new
DefaultMQPushConsumer
(
consumerGroup
);
consumer
=
new
DefaultMQPushConsumer
(
consumerGroup
,
false
);
consumer
.
setInstanceName
(
RandomUtil
.
getStringByUUID
());
consumer
.
setNamesrvAddr
(
nsAddr
);
try
{
...
...
test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
浏览文件 @
e04db85e
...
...
@@ -24,7 +24,7 @@ import org.apache.rocketmq.test.util.RandomUtil;
public
class
ProducerFactory
{
public
static
DefaultMQProducer
getRMQProducer
(
String
ns
)
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
RandomUtil
.
getStringByUUID
());
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
RandomUtil
.
getStringByUUID
()
,
false
);
producer
.
setNamesrvAddr
(
ns
);
try
{
producer
.
start
();
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java
浏览文件 @
e04db85e
...
...
@@ -71,7 +71,7 @@ public class SendMsgStatusCommand implements SubCommand {
@Override
public
void
execute
(
CommandLine
commandLine
,
Options
options
,
RPCHook
rpcHook
)
throws
SubCommandException
{
final
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"PID_SMSC"
,
rpcHook
);
final
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"PID_SMSC"
,
rpcHook
,
false
);
producer
.
setInstanceName
(
"PID_SMSC_"
+
System
.
currentTimeMillis
());
try
{
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
浏览文件 @
e04db85e
...
...
@@ -213,7 +213,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
public
void
execute
(
CommandLine
commandLine
,
Options
options
,
RPCHook
rpcHook
)
throws
SubCommandException
{
DefaultMQAdminExt
defaultMQAdminExt
=
new
DefaultMQAdminExt
(
rpcHook
);
defaultMQAdminExt
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
DefaultMQProducer
defaultMQProducer
=
new
DefaultMQProducer
(
"ReSendMsgById"
);
DefaultMQProducer
defaultMQProducer
=
new
DefaultMQProducer
(
"ReSendMsgById"
,
false
);
defaultMQProducer
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
try
{
...
...
tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
浏览文件 @
e04db85e
...
...
@@ -65,7 +65,7 @@ public class MonitorService {
private
final
DefaultMQPullConsumer
defaultMQPullConsumer
=
new
DefaultMQPullConsumer
(
MixAll
.
TOOLS_CONSUMER_GROUP
);
private
final
DefaultMQPushConsumer
defaultMQPushConsumer
=
new
DefaultMQPushConsumer
(
MixAll
.
MONITOR_CONSUMER_GROUP
);
MixAll
.
MONITOR_CONSUMER_GROUP
,
false
);
public
MonitorService
(
MonitorConfig
monitorConfig
,
MonitorListener
monitorListener
,
RPCHook
rpcHook
)
{
this
.
monitorConfig
=
monitorConfig
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录