未验证 提交 c50ada6e 编写于 作者: Z Zhendong Liu 提交者: GitHub

Merge pull request #597 from zongtanghu/develop

[ISSUE #525] Support the message track. Merge to branch msg_track
......@@ -228,6 +228,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is user self defined topic and this node is trace broker!";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
try {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
......
......@@ -124,6 +124,16 @@ public class TopicConfigManager extends ConfigManager {
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
}
public boolean isSystemTopic(final String topic) {
......@@ -154,6 +164,10 @@ public class TopicConfigManager extends ConfigManager {
if (topicConfig != null)
return topicConfig;
if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
return topicConfig;
}
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
......
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
......@@ -29,6 +30,12 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.client.trace.core.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
......@@ -36,6 +43,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -56,6 +64,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Internal implementation. Most of the functions herein are delegated to it.
*/
......@@ -246,6 +256,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private long consumeTimeout = 15;
/**
* Interface of asynchronous transfer data
*/
private AsyncDispatcher traceDispatcher = null;
/**
* Default constructor.
*/
......@@ -267,6 +282,39 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (msgTraceSwitch) {
try {
Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000");
tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048");
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.CONSUMER.name());
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
* Constructor specifying RPC hook.
*
......@@ -276,6 +324,16 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch);
}
/**
* Constructor specifying consumer group.
*
......@@ -518,6 +576,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Override
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
traceDispatcher.start(tempProperties);
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
/**
......@@ -526,6 +593,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Override
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
}
@Override
......@@ -694,4 +764,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void setConsumeTimeout(final long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}
public AsyncDispatcher getTraceDispatcher() {
return traceDispatcher;
}
public void setTraceDispatcher(AsyncDispatcher traceDispatcher) {
this.traceDispatcher = traceDispatcher;
}
}
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.producer;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
......@@ -25,6 +26,12 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
......@@ -33,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
......@@ -56,6 +64,8 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
*/
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
......@@ -119,11 +129,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private int maxMessageSize = 1024 * 1024 * 4; // 4M
/**
* Interface of asynchronous transfer data
*/
private AsyncDispatcher traceDispatcher = null;
/**
* Default constructor.
*/
public DefaultMQProducer() {
this(MixAll.DEFAULT_PRODUCER_GROUP, null);
this(MixAll.DEFAULT_PRODUCER_GROUP, null,false);
}
/**
......@@ -137,6 +152,37 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
/**
* Constructor specifying both producer group and RPC hook.
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message track trace feature
if (msgTraceSwitch) {
try {
Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000");
tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048");
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.PRODUCER.name());
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTrackHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
* Constructor specifying producer group.
*
......@@ -147,8 +193,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Constructor specifying the RPC hook.
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) {
this(producerGroup, null, msgTraceSwitch);
}
/**
* Constructor specifying the RPC hook.
*
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(RPCHook rpcHook) {
......@@ -170,6 +226,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
traceDispatcher.start(tempProperties);
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
/**
......@@ -178,6 +243,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void shutdown() {
this.defaultMQProducerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
}
/**
......@@ -787,4 +855,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
public AsyncDispatcher getTraceDispatcher() {
return traceDispatcher;
}
public void setTraceDispatcher(AsyncDispatcher traceDispatcher) {
this.traceDispatcher = traceDispatcher;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.common;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
public class TrackTraceBean {
private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
private String topic = "";
private String msgId = "";
private String offsetMsgId = "";
private String tags = "";
private String keys = "";
private String storeHost = LOCAL_ADDRESS;
private String clientHost = LOCAL_ADDRESS;
private long storeTime;
private int retryTimes;
private int bodyLength;
private MessageType msgType;
public MessageType getMsgType() {
return msgType;
}
public void setMsgType(final MessageType msgType) {
this.msgType = msgType;
}
public String getOffsetMsgId() {
return offsetMsgId;
}
public void setOffsetMsgId(final String offsetMsgId) {
this.offsetMsgId = offsetMsgId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getKeys() {
return keys;
}
public void setKeys(String keys) {
this.keys = keys;
}
public String getStoreHost() {
return storeHost;
}
public void setStoreHost(String storeHost) {
this.storeHost = storeHost;
}
public String getClientHost() {
return clientHost;
}
public void setClientHost(String clientHost) {
this.clientHost = clientHost;
}
public long getStoreTime() {
return storeTime;
}
public void setStoreTime(long storeTime) {
this.storeTime = storeTime;
}
public int getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
}
public int getBodyLength() {
return bodyLength;
}
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.common;
import org.apache.rocketmq.common.MixAll;
public class TrackTraceConstants {
public static final String NAMESRV_ADDR = "NAMESRV_ADDR";
public static final String ADDRSRV_URL = "ADDRSRV_URL";
public static final String INSTANCE_NAME = "InstanceName";
public static final String ASYNC_BUFFER_SIZE = "AsyncBufferSize";
public static final String MAX_BATCH_NUM = "MaxBatchNum";
public static final String WAKE_UP_NUM = "WakeUpNum";
public static final String MAX_MSG_SIZE = "MaxMsgSize";
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
public static final String TRACE_TOPIC = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_DISPATCHER_TYPE = "DispatcherType";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.common;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import java.util.List;
/**
* The context of Track Trace
*/
public class TrackTraceContext implements Comparable<TrackTraceContext> {
private TrackTraceType traceType;
private long timeStamp = System.currentTimeMillis();
private String regionId = "";
private String regionName = "";
private String groupName = "";
private int costTime = 0;
private boolean isSuccess = true;
private String requestId = MessageClientIDSetter.createUniqID();
private int contextCode = 0;
private List<TrackTraceBean> traceBeans;
public int getContextCode() {
return contextCode;
}
public void setContextCode(final int contextCode) {
this.contextCode = contextCode;
}
public List<TrackTraceBean> getTraceBeans() {
return traceBeans;
}
public void setTraceBeans(List<TrackTraceBean> traceBeans) {
this.traceBeans = traceBeans;
}
public String getRegionId() {
return regionId;
}
public void setRegionId(String regionId) {
this.regionId = regionId;
}
public TrackTraceType getTraceType() {
return traceType;
}
public void setTraceType(TrackTraceType traceType) {
this.traceType = traceType;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public int getCostTime() {
return costTime;
}
public void setCostTime(int costTime) {
this.costTime = costTime;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess(boolean success) {
isSuccess = success;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getRegionName() {
return regionName;
}
public void setRegionName(String regionName) {
this.regionName = regionName;
}
@Override
public int compareTo(TrackTraceContext o) {
return (int) (this.timeStamp - o.getTimeStamp());
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(1024);
sb.append(traceType).append("_").append(groupName)
.append("_").append(regionId).append("_").append(isSuccess).append("_");
if (traceBeans != null && traceBeans.size() > 0) {
for (TrackTraceBean bean : traceBeans) {
sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_");
}
}
return "TrackTraceContext{" + sb.toString() + '}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.common;
import org.apache.rocketmq.common.message.MessageType;
import java.util.ArrayList;
import java.util.List;
import static org.apache.rocketmq.client.trace.core.common.TrackTraceType.Pub;
/**
* encode/decode for Track Trace Data
*/
public class TrackTraceDataEncoder {
/**
* resolving traceContext list From track trace data String
*
* @param traceData
* @return
*/
public static List<TrackTraceContext> decoderFromTraceDataString(String traceData) {
List<TrackTraceContext> resList = new ArrayList<TrackTraceContext>();
if (traceData == null || traceData.length() <= 0) {
return resList;
}
String[] contextList = traceData.split(String.valueOf(TrackTraceConstants.FIELD_SPLITOR));
for (String context : contextList) {
String[] line = context.split(String.valueOf(TrackTraceConstants.CONTENT_SPLITOR));
if (line[0].equals(Pub.name())) {
TrackTraceContext pubContext = new TrackTraceContext();
pubContext.setTraceType(Pub);
pubContext.setTimeStamp(Long.parseLong(line[1]));
pubContext.setRegionId(line[2]);
pubContext.setGroupName(line[3]);
TrackTraceBean bean = new TrackTraceBean();
bean.setTopic(line[4]);
bean.setMsgId(line[5]);
bean.setTags(line[6]);
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setBodyLength(Integer.parseInt(line[9]));
pubContext.setCostTime(Integer.parseInt(line[10]));
bean.setMsgType(MessageType.values()[Integer.parseInt(line[11])]);
if (line.length == 13) {
pubContext.setSuccess(Boolean.parseBoolean(line[12]));
} else if (line.length == 14) {
bean.setOffsetMsgId(line[12]);
pubContext.setSuccess(Boolean.parseBoolean(line[13]));
}
pubContext.setTraceBeans(new ArrayList<TrackTraceBean>(1));
pubContext.getTraceBeans().add(bean);
resList.add(pubContext);
} else if (line[0].equals(TrackTraceType.SubBefore.name())) {
TrackTraceContext subBeforeContext = new TrackTraceContext();
subBeforeContext.setTraceType(TrackTraceType.SubBefore);
subBeforeContext.setTimeStamp(Long.parseLong(line[1]));
subBeforeContext.setRegionId(line[2]);
subBeforeContext.setGroupName(line[3]);
subBeforeContext.setRequestId(line[4]);
TrackTraceBean bean = new TrackTraceBean();
bean.setMsgId(line[5]);
bean.setRetryTimes(Integer.parseInt(line[6]));
bean.setKeys(line[7]);
subBeforeContext.setTraceBeans(new ArrayList<TrackTraceBean>(1));
subBeforeContext.getTraceBeans().add(bean);
resList.add(subBeforeContext);
} else if (line[0].equals(TrackTraceType.SubAfter.name())) {
TrackTraceContext subAfterContext = new TrackTraceContext();
subAfterContext.setTraceType(TrackTraceType.SubAfter);
subAfterContext.setRequestId(line[1]);
TrackTraceBean bean = new TrackTraceBean();
bean.setMsgId(line[2]);
bean.setKeys(line[5]);
subAfterContext.setTraceBeans(new ArrayList<TrackTraceBean>(1));
subAfterContext.getTraceBeans().add(bean);
subAfterContext.setCostTime(Integer.parseInt(line[3]));
subAfterContext.setSuccess(Boolean.parseBoolean(line[4]));
if (line.length >= 7) {
// add the context type
subAfterContext.setContextCode(Integer.parseInt(line[6]));
}
resList.add(subAfterContext);
}
}
return resList;
}
/**
* Encoding the trace context into track data strings and keyset sets
*
* @param ctx
* @return
*/
public static TrackTraceTransferBean encoderFromContextBean(TrackTraceContext ctx) {
if (ctx == null) {
return null;
}
//build message track trace of the transfering entity content bean
TrackTraceTransferBean transferBean = new TrackTraceTransferBean();
StringBuilder sb = new StringBuilder(256);
switch (ctx.getTraceType()) {
case Pub: {
TrackTraceBean bean = ctx.getTraceBeans().get(0);
//append the content of context and traceBean to transferBean's TransData
sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getTopic()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getTags()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getBodyLength()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getOffsetMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TrackTraceConstants.FIELD_SPLITOR);
}
break;
case SubBefore: {
for (TrackTraceBean bean : ctx.getTraceBeans()) {
sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getRetryTimes()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TrackTraceConstants.FIELD_SPLITOR);//
}
}
break;
case SubAfter: {
for (TrackTraceBean bean : ctx.getTraceBeans()) {
sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)//
.append(ctx.getContextCode()).append(TrackTraceConstants.FIELD_SPLITOR);
}
}
break;
default:
}
transferBean.setTransData(sb.toString());
for (TrackTraceBean bean : ctx.getTraceBeans()) {
transferBean.getTransKey().add(bean.getMsgId());
if (bean.getKeys() != null && bean.getKeys().length() > 0) {
transferBean.getTransKey().add(bean.getKeys());
}
}
return transferBean;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.common;
public enum TrackTraceDispatcherType {
PRODUCER,
CONSUMER
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.common;
import java.util.HashSet;
import java.util.Set;
/**
* track trace transfering bean
*/
public class TrackTraceTransferBean {
private String transData;
private Set<String> transKey = new HashSet<String>();
public String getTransData() {
return transData;
}
public void setTransData(String transData) {
this.transData = transData;
}
public Set<String> getTransKey() {
return transKey;
}
public void setTransKey(Set<String> transKey) {
this.transKey = transKey;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.common;
public enum TrackTraceType {
Pub,
SubBefore,
SubAfter,
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.dispatch;
import java.util.Properties;
import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException;
/**
* Interface of asynchronous transfer data
*/
public interface AsyncDispatcher {
/**
* Initialize asynchronous transfer data module
*/
void start(Properties properties) throws MQClientException;
/**
* append the transfering data
* @param ctx data infomation
* @return
*/
boolean append(Object ctx);
/**
* write flush action
*
* @throws IOException
*/
void flush() throws IOException;
/**
* close the track trace Hook
*/
void shutdown();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.dispatch.impl;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
import org.apache.rocketmq.client.trace.core.common.TrackTraceDataEncoder;
import org.apache.rocketmq.client.trace.core.common.TrackTraceTransferBean;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import java.io.IOException;
import java.util.List;
import java.util.HashMap;
import java.util.UUID;
import java.util.Properties;
import java.util.Set;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Created by zongtanghu on 2018/11/6.
*/
public class AsyncArrayDispatcher implements AsyncDispatcher {
private final static InternalLogger log = ClientLogger.getLog();
private final int queueSize;
private final int batchSize;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecuter;
// the last discard number of log
private AtomicLong discardCount;
private Thread worker;
private ArrayBlockingQueue<TrackTraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private String dispatcherType;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
public AsyncArrayDispatcher(Properties properties) throws MQClientException {
dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"));
// queueSize is greater than or equal to the n power of 2 of value
queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
this.queueSize = queueSize;
batchSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_BATCH_NUM, "1"));
this.discardCount = new AtomicLong(0L);
traceContextQueue = new ArrayBlockingQueue<TrackTraceContext>(1024);
appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
this.traceExecuter = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties);
}
public DefaultMQProducer getTraceProducer() {
return traceProducer;
}
public DefaultMQProducerImpl getHostProducer() {
return hostProducer;
}
public void setHostProducer(DefaultMQProducerImpl hostProducer) {
this.hostProducer = hostProducer;
}
public DefaultMQPushConsumerImpl getHostConsumer() {
return hostConsumer;
}
public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
this.hostConsumer = hostConsumer;
}
public void start(Properties properties) throws MQClientException {
TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId,properties.getProperty(TrackTraceConstants.NAMESRV_ADDR));
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TrackTraceContext) ctx);
if (!result) {
log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
}
return result;
}
@Override
public void flush() throws IOException {
// the maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500;
while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
break;
}
}
log.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size());
}
@Override
public void shutdown() {
this.stopped = true;
this.traceExecuter.shutdown();
TrackTraceProducerFactory.unregisterTraceDispatcher(dispatcherId);
this.removeShutdownHook();
}
public void registerShutDownHook() {
if (shutDownHook == null) {
shutDownHook = new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
@Override
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
try {
flush();
} catch (IOException e) {
log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
}
}
}
}
}, "ShutdownHookMQTrace");
Runtime.getRuntime().addShutdownHook(shutDownHook);
}
}
public void removeShutdownHook() {
if (shutDownHook != null) {
Runtime.getRuntime().removeShutdownHook(shutDownHook);
}
}
class AsyncRunnable implements Runnable {
private boolean stopped;
@Override
public void run() {
while (!stopped) {
List<TrackTraceContext> contexts = new ArrayList<TrackTraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
TrackTraceContext context = null;
try {
//get track trace data element from blocking Queue — traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecuter.submit(request);
} else if (AsyncArrayDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
class AsyncAppenderRequest implements Runnable {
List<TrackTraceContext> contextList;
public AsyncAppenderRequest(final List<TrackTraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<TrackTraceContext>(1);
}
}
@Override
public void run() {
sendTraceData(contextList);
}
public void sendTraceData(List<TrackTraceContext> contextList) {
Map<String, List<TrackTraceTransferBean>> transBeanMap = new HashMap<String, List<TrackTraceTransferBean>>();
for (TrackTraceContext context : contextList) {
if (context.getTraceBeans().isEmpty()) {
continue;
}
//1.topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic();
//2.use original message entity's topic as key
String key = topic;
List<TrackTraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<TrackTraceTransferBean>();
transBeanMap.put(key, transBeanList);
}
TrackTraceTransferBean traceData = TrackTraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
}
for (Map.Entry<String, List<TrackTraceTransferBean>> entry : transBeanMap.entrySet()) {
//key -> dataTopic(Not trace Topic)
String dataTopic = entry.getKey();
flushData(entry.getValue(), dataTopic);
}
}
/**
* batch sending data actually
*/
private void flushData(List<TrackTraceTransferBean> transBeanList, String topic) {
if (transBeanList.size() == 0) {
return;
}
// temporary buffer
StringBuilder buffer = new StringBuilder(1024);
int count = 0;
Set<String> keySet = new HashSet<String>();
for (TrackTraceTransferBean bean : transBeanList) {
// keyset of message track trace includes msgId of or original message
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
count++;
// Ensure that the size of the package should not exceed the upper limit.
if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString());
// clear temporary buffer after finishing
buffer.delete(0, buffer.length());
keySet.clear();
count = 0;
}
}
if (count > 0) {
sendTraceDataByMQ(keySet, buffer.toString());
}
transBeanList.clear();
}
/**
* send message track trace data
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message track trace data in this batch
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data) {
String topic = TrackTraceConstants.TRACE_TOPIC;
final Message message = new Message(topic, data.getBytes());
//keyset of message track trace includes msgId of or original message
message.setKeys(keySet);
try {
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
log.info("send trace data ,the traceData is " + data);
}
};
if (traceBrokerSet.isEmpty()) {
//no cross set
traceProducer.send(message, callback, 5000);
} else {
traceProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Set<String> brokerSet = (Set<String>) arg;
List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
for (MessageQueue queue : mqs) {
if (brokerSet.contains(queue.getBrokerName())) {
filterMqs.add(queue);
}
}
int index = sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % filterMqs.size();
if (pos < 0) {
pos = 0;
}
return filterMqs.get(pos);
}
}, traceBrokerSet, callback);
}
} catch (Exception e) {
log.info("send trace data,the traceData is" + data);
}
}
private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
Set<String> brokerSet = new HashSet<String>();
TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
brokerSet.add(queue.getBrokerName());
}
}
return brokerSet;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.dispatch.impl;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class TrackTraceProducerFactory {
private static Map<String, Object> dispatcherTable = new ConcurrentHashMap<String, Object>();
private static AtomicBoolean isStarted = new AtomicBoolean(false);
private static DefaultMQProducer traceProducer;
public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) {
if (traceProducer == null) {
traceProducer = new DefaultMQProducer();
traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME);
traceProducer.setSendMsgTimeout(5000);
traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis())));
String nameSrv = properties.getProperty(TrackTraceConstants.NAMESRV_ADDR);
if (nameSrv == null) {
TopAddressing topAddressing = new TopAddressing(properties.getProperty(TrackTraceConstants.ADDRSRV_URL));
nameSrv = topAddressing.fetchNSAddr();
}
traceProducer.setNamesrvAddr(nameSrv);
traceProducer.setVipChannelEnabled(false);
//the max size of message is 128K
int maxSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_MSG_SIZE, "128000"));
traceProducer.setMaxMessageSize(maxSize - 10 * 1000);
}
return traceProducer;
}
public static void registerTraceDispatcher(String dispatcherId, String nameSrvAddr) throws MQClientException {
dispatcherTable.put(dispatcherId, new Object());
if (traceProducer != null && isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.start();
}
}
public static void unregisterTraceDispatcher(String dispatcherId) {
dispatcherTable.remove(dispatcherId);
if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get()) {
traceProducer.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.hook;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.trace.core.common.TrackTraceBean;
import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
import org.apache.rocketmq.client.trace.core.common.TrackTraceType;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.ArrayList;
import java.util.List;
public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
private AsyncDispatcher localDispatcher;
public ConsumeMessageTraceHookImpl(AsyncDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
@Override
public String hookName() {
return "ConsumeMessageTraceHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TrackTraceContext traceContext = new TrackTraceContext();
context.setMqTraceContext(traceContext);
traceContext.setTraceType(TrackTraceType.SubBefore);//
traceContext.setGroupName(context.getConsumerGroup());//
List<TrackTraceBean> beans = new ArrayList<TrackTraceBean>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
}
String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
if (traceOn != null && traceOn.equals("false")) {
// if trace switch is false ,skip it
continue;
}
TrackTraceBean traceBean = new TrackTraceBean();
traceBean.setTopic(msg.getTopic());//
traceBean.setMsgId(msg.getMsgId());//
traceBean.setTags(msg.getTags());//
traceBean.setKeys(msg.getKeys());//
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
if (beans.size() > 0) {
traceContext.setTraceBeans(beans);
traceContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(traceContext);
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TrackTraceContext subBeforeContext = (TrackTraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// if subbefore bean is null ,skip it
return;
}
TrackTraceContext subAfterContext = new TrackTraceContext();
subAfterContext.setTraceType(TrackTraceType.SubAfter);//
subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(subBeforeContext.getGroupName());//
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//
//caculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
}
localDispatcher.append(subAfterContext);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.core.hook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.core.common.TrackTraceBean;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
import org.apache.rocketmq.client.trace.core.common.TrackTraceType;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.common.MixAll;
import java.util.ArrayList;
public class SendMessageTrackHookImpl implements SendMessageHook {
private AsyncDispatcher localDispatcher;
public SendMessageTrackHookImpl(AsyncDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
@Override
public String hookName() {
return "SendMessageTrackHook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) {
return;
}
//build the context content of TuxeTraceContext
TrackTraceContext tuxeContext = new TrackTraceContext();
tuxeContext.setTraceBeans(new ArrayList<TrackTraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TrackTraceType.Pub);
tuxeContext.setGroupName(context.getProducerGroup());
//build the data bean object of message track trace
TrackTraceBean traceBean = new TrackTraceBean();
traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}
@Override
public void sendMessageAfter(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(TrackTraceConstants.TRACE_TOPIC)
|| context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {
return;
}
if (context.getSendResult().getRegionId() == null
|| !context.getSendResult().isTraceOn()) {
// if switch is false,skip it
return;
}
TrackTraceContext tuxeContext = (TrackTraceContext) context.getMqTraceContext();
TrackTraceBean traceBean = tuxeContext.getTraceBeans().get(0);
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
tuxeContext.setSuccess(true);
} else {
tuxeContext.setSuccess(false);
}
tuxeContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
localDispatcher.append(tuxeContext);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis();
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
private DefaultMQPushConsumer normalPushConsumer;
private AsyncArrayDispatcher asyncArrayDispatcher;
private MQClientInstance mQClientTraceFactory;
@Mock
private MQClientAPIImpl mQClientTraceAPIImpl;
private DefaultMQProducer traceProducer;
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup,true);
consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
asyncArrayDispatcher = (AsyncArrayDispatcher)pushConsumer.getTraceDispatcher();
traceProducer = asyncArrayDispatcher.getTraceProducer();
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return null;
}
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
mQClientTraceFactory = spy(pushConsumerImpl.getmQClientFactory());
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
fieldTrace.setAccessible(true);
fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory);
fieldTrace = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
fieldTrace.setAccessible(true);
fieldTrace.set(mQClientTraceFactory, mQClientTraceAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
}
@After
public void terminate() {
pushConsumer.shutdown();
}
@Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
//when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
//when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
messageExts[0] = msgs.get(0);
countDownLatch.countDown();
return null;
}
}));
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(1024);
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
pullRequest.setMessageQueue(messageQueue);
ProcessQueue processQueue = new ProcessQueue();
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
pullRequest.setProcessQueue(processQueue);
return pullRequest;
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
public static TopicRouteData createTraceTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("broker-trace");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10912");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("broker-trace");
queueData.setPerm(6);
queueData.setReadQueueNums(1);
queueData.setWriteQueueNums(1);
queueData.setTopicSynFlag(1);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQProducerWithTraceTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Spy
private MQClientInstance mQClientTraceFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientTraceAPIImpl;
private AsyncArrayDispatcher asyncArrayDispatcher;
private DefaultMQProducer producer;
private DefaultMQProducer traceProducer;
private DefaultMQProducer normalProducer;
private Message message;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis();
@Before
public void init() throws Exception {
normalProducer = new DefaultMQProducer(producerGroupTemp,false);
producer = new DefaultMQProducer(producerGroupTemp,true);
producer.setNamesrvAddr("127.0.0.1:9876");
normalProducer.setNamesrvAddr("127.0.0.1:9877");
message = new Message(topic, new byte[] {'a', 'b' ,'c'});
asyncArrayDispatcher = (AsyncArrayDispatcher)producer.getTraceDispatcher();
traceProducer = asyncArrayDispatcher.getTraceProducer();
producer.start();
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
fieldTrace.setAccessible(true);
fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientTraceFactory, mQClientTraceAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
producer.send(message);
}catch (MQClientException e){
}
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
}
@Test
public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
producer.send(message);
}catch (MQClientException e){
}
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
}
@After
public void terminate() {
producer.shutdown();
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
public static TopicRouteData createTraceTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("broker-trace");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10912");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("broker-trace");
queueData.setPerm(6);
queueData.setReadQueueNums(1);
queueData.setWriteQueueNums(1);
queueData.setTopicSynFlag(1);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
}
......@@ -51,7 +51,8 @@ public class BrokerConfig {
@ImportantField
private boolean autoCreateSubscriptionGroup = true;
private String messageStorePlugIn = "";
@ImportantField
private boolean autoTraceBrokerEnable = false;
/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
* value is 1.
......@@ -732,4 +733,12 @@ public class BrokerConfig {
public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
}
public boolean isAutoTraceBrokerEnable() {
return autoTraceBrokerEnable;
}
public void setAutoTraceBrokerEnable(boolean autoTraceBrokerEnable) {
this.autoTraceBrokerEnable = autoTraceBrokerEnable;
}
}
......@@ -82,7 +82,8 @@ public class MixAll {
public static final long CURRENT_JVM_PID = getPID();
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
public static final String TRACE_BROKER_NAME_SUFFIX = "trace";
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
......@@ -90,6 +91,7 @@ public class MixAll {
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACK_TRACE_TOPIC = "RMQ_SYS_TRACK_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
......
......@@ -19,3 +19,4 @@ deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
autoTraceBrokerEnable=false
\ No newline at end of file
......@@ -19,3 +19,4 @@ deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
autoTraceBrokerEnable=false
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
brokerName=broker-trace
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
autoTraceBrokerEnable=true
\ No newline at end of file
......@@ -26,7 +26,6 @@ public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
......
......@@ -29,10 +29,10 @@ public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("Jodie_topic_1023", "*");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20170422221800");
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.tracemessage;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TraceProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.tracemessage;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class TracePushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
......@@ -63,7 +63,7 @@ public class ProducerInstance {
return p;
}
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group,false);
defaultMQProducer.setNamesrvAddr(nameServerAddress);
MQProducer beforeProducer = null;
beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
......
......@@ -39,7 +39,7 @@ public class AbstractTestCase {
@Before
public void mockLoggerAppender() throws Exception {
DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender"));
DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender",false));
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
......
......@@ -46,7 +46,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
}
public void create(boolean useTLS) {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer = new DefaultMQPushConsumer(consumerGroup,false);
consumer.setInstanceName(RandomUtil.getStringByUUID());
consumer.setNamesrvAddr(nsAddr);
try {
......
......@@ -24,7 +24,7 @@ import org.apache.rocketmq.test.util.RandomUtil;
public class ProducerFactory {
public static DefaultMQProducer getRMQProducer(String ns) {
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID(),false);
producer.setNamesrvAddr(ns);
try {
producer.start();
......
......@@ -213,7 +213,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById");
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById",false);
defaultMQProducer.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
......
......@@ -65,7 +65,7 @@ public class MonitorService {
private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
MixAll.TOOLS_CONSUMER_GROUP);
private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(
MixAll.MONITOR_CONSUMER_GROUP);
MixAll.MONITOR_CONSUMER_GROUP,false);
public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook rpcHook) {
this.monitorConfig = monitorConfig;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册