diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 73fe43942709136ec2d94c879feb66b9a0e23286..9771ec9a9e099fafc67667d57f869d743ec1b115 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -228,6 +228,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } + if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) { + String errorMsg = "the topic[" + requestHeader.getTopic() + "] is user self defined topic and this node is trace broker!"; + log.warn(errorMsg); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorMsg); + return response; + } + try { response.setCode(ResponseCode.SUCCESS); response.setOpaque(request.getOpaque()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 59c7895eb729566a0b159b149d5846051d07ada7..4ce996b2cfc494d5be03d7bb51312a70b358bd4e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -124,6 +124,17 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } + { + // MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) { + String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(1); + topicConfig.setWriteQueueNums(1); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + } } public boolean isSystemTopic(final String topic) { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index d51030a159240cd9b34aa4dfd9ca438b013cf577..5ee2e33ffbeb5cb35908a1bbc5e25f47f5e3f0c8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; @@ -29,6 +30,12 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; +import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType; +import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; +import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.client.trace.core.hook.ConsumeMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -36,6 +43,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -56,6 +64,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException; */ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { + private final InternalLogger log = ClientLogger.getLog(); + /** * Internal implementation. Most of the functions herein are delegated to it. */ @@ -246,11 +256,16 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private long consumeTimeout = 15; + /** + * Interface of asynchronous transfer data + */ + private AsyncDispatcher traceDispatcher = null; + /** * Default constructor. */ public DefaultMQPushConsumer() { - this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); + this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely(), false); } /** @@ -261,10 +276,29 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param allocateMessageQueueStrategy message queue allocating algorithm. */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, - AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); + //if client open the message track trace feature + if (msgTraceSwitch) { + try { + Properties tempProperties = new Properties(); + tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000"); + tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"); + tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100"); + tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); + tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.CONSUMER.name()); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); + dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); + traceDispatcher = dispatcher; + + this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( + new ConsumeMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } } /** @@ -273,7 +307,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param rpcHook RPC hook to execute before each remoting command. */ public DefaultMQPushConsumer(RPCHook rpcHook) { - this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); + this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely(),false); } /** @@ -281,8 +315,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * * @param consumerGroup Consumer group. */ - public DefaultMQPushConsumer(final String consumerGroup) { - this(consumerGroup, null, new AllocateMessageQueueAveragely()); + public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch) { + this(consumerGroup, null, new AllocateMessageQueueAveragely(),msgTraceSwitch); } @Override @@ -518,6 +552,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Override public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); + if (null != traceDispatcher) { + try { + Properties tempProperties = new Properties(); + tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); + traceDispatcher.start(tempProperties); + } catch (MQClientException e) { + log.warn("trace dispatcher start failed ", e); + } + } } /** @@ -526,6 +569,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Override public void shutdown() { this.defaultMQPushConsumerImpl.shutdown(); + if (null != traceDispatcher) { + traceDispatcher.shutdown(); + } } @Override @@ -694,4 +740,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume public void setConsumeTimeout(final long consumeTimeout) { this.consumeTimeout = consumeTimeout; } + + public AsyncDispatcher getTraceDispatcher() { + return traceDispatcher; + } + + public void setTraceDispatcher(AsyncDispatcher traceDispatcher) { + this.traceDispatcher = traceDispatcher; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 9ffaed0a4f9086a87d88603f00f0335330af560d..c35d9468def2c3bbe20639dcfcf815329418d930 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -144,7 +144,7 @@ public class MQClientInstance { this.rebalanceService = new RebalanceService(this); - this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); + this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP,false); this.defaultMQProducer.resetClientConfig(clientConfig); this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 9732d0eb84458062d203db0af55df8e37043b1a3..a2bacfb9071880dd2b4eafe2dc0f6783b54a9be6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.producer; import java.util.Collection; import java.util.List; +import java.util.Properties; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; @@ -25,6 +26,12 @@ import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; +import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType; +import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; +import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; @@ -33,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; @@ -56,6 +64,8 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient; */ public class DefaultMQProducer extends ClientConfig implements MQProducer { + private final InternalLogger log = ClientLogger.getLog(); + /** * Wrapping internal implementations for virtually all methods presented in this class. */ @@ -119,11 +129,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ private int maxMessageSize = 1024 * 1024 * 4; // 4M + /** + * Interface of asynchronous transfer data + */ + private AsyncDispatcher traceDispatcher = null; + /** * Default constructor. */ public DefaultMQProducer() { - this(MixAll.DEFAULT_PRODUCER_GROUP, null); + this(MixAll.DEFAULT_PRODUCER_GROUP, null,false); } /** @@ -131,10 +146,30 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. + * @param msgTraceSwitch switch flag instance for message track trace */ - public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch) { this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + //if client open the message track trace feature + if (msgTraceSwitch) { + try { + Properties tempProperties = new Properties(); + tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000"); + tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"); + tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100"); + tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); + tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.PRODUCER.name()); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); + dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); + traceDispatcher = dispatcher; + + this.getDefaultMQProducerImpl().registerSendMessageHook( + new SendMessageTrackHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } } /** @@ -142,17 +177,17 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * * @param producerGroup Producer group, see the name-sake field. */ - public DefaultMQProducer(final String producerGroup) { - this(producerGroup, null); + public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) { + this(producerGroup, null,msgTraceSwitch); } /** * Constructor specifying the RPC hook. - * + * * @param rpcHook RPC hook to execute per each remoting command execution. */ public DefaultMQProducer(RPCHook rpcHook) { - this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); + this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook,false); } /** @@ -170,6 +205,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); + if (null != traceDispatcher) { + try { + Properties tempProperties = new Properties(); + tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); + traceDispatcher.start(tempProperties); + } catch (MQClientException e) { + log.warn("trace dispatcher start failed ", e); + } + } } /** @@ -178,6 +222,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void shutdown() { this.defaultMQProducerImpl.shutdown(); + if (null != traceDispatcher) { + traceDispatcher.shutdown(); + } } /** @@ -777,4 +824,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) { this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; } + + public AsyncDispatcher getTraceDispatcher() { + return traceDispatcher; + } + + public void setTraceDispatcher(AsyncDispatcher traceDispatcher) { + this.traceDispatcher = traceDispatcher; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index 8f6428b29c2ae700547d0b082a8d13d608e35bd1..f6030c9d26bf19fafdd8997fa981d898a87d2124 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -35,11 +35,11 @@ public class TransactionMQProducer extends DefaultMQProducer { } public TransactionMQProducer(final String producerGroup) { - super(producerGroup); + super(producerGroup,false); } public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { - super(producerGroup, rpcHook); + super(producerGroup, rpcHook,false); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/Utils/TrackTraceUtils.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/Utils/TrackTraceUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..7d1d75b0dd57035685432a7411f5fb1850deb065 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/Utils/TrackTraceUtils.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.Utils; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Enumeration; + +/** + * Track Trace Util + */ +public class TrackTraceUtils { + + public static String getLocalAddress() { + try { + //Traverse the network bind card to find a valid IP address and return it. + Enumeration enumeration = NetworkInterface.getNetworkInterfaces(); + ArrayList ipv4Result = new ArrayList(); + ArrayList ipv6Result = new ArrayList(); + while (enumeration.hasMoreElements()) { + final NetworkInterface networkInterface = enumeration.nextElement(); + final Enumeration en = networkInterface.getInetAddresses(); + while (en.hasMoreElements()) { + final InetAddress address = en.nextElement(); + if (!address.isLoopbackAddress()) { + if (address instanceof Inet6Address) { + ipv6Result.add(normalizeHostAddress(address)); + } else { + ipv4Result.add(normalizeHostAddress(address)); + } + } + } + } + + // get priority to IPv4 + if (!ipv4Result.isEmpty()) { + for (String ip : ipv4Result) { + if (ip.startsWith("127.0") || ip.startsWith("192.168")) { + continue; + } + + return ip; + } + + //get the last one + return ipv4Result.get(ipv4Result.size() - 1); + } + //then use the ipv6 address + else if (!ipv6Result.isEmpty()) { + return ipv6Result.get(0); + } + //the use local ip address + final InetAddress localHost = InetAddress.getLocalHost(); + return normalizeHostAddress(localHost); + } catch (SocketException e) { + e.printStackTrace(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } finally { + + } + + return null; + } + + public static String normalizeHostAddress(final InetAddress localHost) { + if (localHost instanceof Inet6Address) { + return "[" + localHost.getHostAddress() + "]"; + } else { + return localHost.getHostAddress(); + } + } + + public static String toJson(final Object obj, boolean prettyFormat) { + return RemotingSerializable.toJson(obj, prettyFormat); + } + + public static T fromJson(String json, Class classOfT) { + return RemotingSerializable.fromJson(json, classOfT); + } + + public static String replaceNull(String ori) { + return ori == null ? "" : ori; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java new file mode 100644 index 0000000000000000000000000000000000000000..4981759eba76a81705cea594845dcf50ca23320e --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.common; + +import org.apache.rocketmq.client.trace.core.Utils.TrackTraceUtils; +import org.apache.rocketmq.common.message.MessageType; + +public class TrackTraceBean { + private static final String LOCAL_ADDRESS = TrackTraceUtils.getLocalAddress(); + private String topic = ""; + private String msgId = ""; + private String offsetMsgId = ""; + private String tags = ""; + private String keys = ""; + private String storeHost = LOCAL_ADDRESS; + private String clientHost = LOCAL_ADDRESS; + private long storeTime; + private int retryTimes; + private int bodyLength; + private MessageType msgType; + + + public MessageType getMsgType() { + return msgType; + } + + + public void setMsgType(final MessageType msgType) { + this.msgType = msgType; + } + + + public String getOffsetMsgId() { + return offsetMsgId; + } + + + public void setOffsetMsgId(final String offsetMsgId) { + this.offsetMsgId = offsetMsgId; + } + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getMsgId() { + return msgId; + } + + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + + public String getTags() { + return tags; + } + + + public void setTags(String tags) { + this.tags = tags; + } + + + public String getKeys() { + return keys; + } + + + public void setKeys(String keys) { + this.keys = keys; + } + + + public String getStoreHost() { + return storeHost; + } + + + public void setStoreHost(String storeHost) { + this.storeHost = storeHost; + } + + + public String getClientHost() { + return clientHost; + } + + + public void setClientHost(String clientHost) { + this.clientHost = clientHost; + } + + + public long getStoreTime() { + return storeTime; + } + + + public void setStoreTime(long storeTime) { + this.storeTime = storeTime; + } + + + public int getRetryTimes() { + return retryTimes; + } + + + public void setRetryTimes(int retryTimes) { + this.retryTimes = retryTimes; + } + + + public int getBodyLength() { + return bodyLength; + } + + + public void setBodyLength(int bodyLength) { + this.bodyLength = bodyLength; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java new file mode 100644 index 0000000000000000000000000000000000000000..aa49d1c5bb4e5b28a8ff4097e8152daea8376b1d --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.common; + +import org.apache.rocketmq.common.MixAll; + +public class TrackTraceConstants { + public static final String NAMESRV_ADDR = "NAMESRV_ADDR"; + public static final String ADDRSRV_URL = "ADDRSRV_URL"; + public static final String INSTANCE_NAME = "InstanceName"; + public static final String ASYNC_BUFFER_SIZE = "AsyncBufferSize"; + public static final String MAX_BATCH_NUM = "MaxBatchNum"; + public static final String WAKE_UP_NUM = "WakeUpNum"; + public static final String MAX_MSG_SIZE = "MaxMsgSize"; + public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; + public static final String TRACE_TOPIC = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC; + public static final char CONTENT_SPLITOR = (char) 1; + public static final char FIELD_SPLITOR = (char) 2; + public static final String TRACE_DISPATCHER_TYPE = "DispatcherType"; +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java new file mode 100644 index 0000000000000000000000000000000000000000..a6374a6d58e5a16a9a83dcb48806251faac0dc49 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.common; + +import org.apache.rocketmq.common.message.MessageClientIDSetter; + +import java.util.List; + +/** + * The context of Track Trace + */ +public class TrackTraceContext implements Comparable { + + private TrackTraceType traceType; + private long timeStamp = System.currentTimeMillis(); + private String regionId = ""; + private String regionName = ""; + private String groupName = ""; + private int costTime = 0; + private boolean isSuccess = true; + private String requestId = MessageClientIDSetter.createUniqID(); + private int contextCode = 0; + private List traceBeans; + + public int getContextCode() { + return contextCode; + } + + public void setContextCode(final int contextCode) { + this.contextCode = contextCode; + } + + public List getTraceBeans() { + return traceBeans; + } + + public void setTraceBeans(List traceBeans) { + this.traceBeans = traceBeans; + } + + public String getRegionId() { + return regionId; + } + + public void setRegionId(String regionId) { + this.regionId = regionId; + } + + public TrackTraceType getTraceType() { + return traceType; + } + + public void setTraceType(TrackTraceType traceType) { + this.traceType = traceType; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public String getGroupName() { + return groupName; + } + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + public int getCostTime() { + return costTime; + } + + public void setCostTime(int costTime) { + this.costTime = costTime; + } + + public boolean isSuccess() { + return isSuccess; + } + + public void setSuccess(boolean success) { + isSuccess = success; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getRegionName() { + return regionName; + } + + public void setRegionName(String regionName) { + this.regionName = regionName; + } + + @Override + public int compareTo(TrackTraceContext o) { + return (int) (this.timeStamp - o.getTimeStamp()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(1024); + sb.append(traceType).append("_").append(groupName) + .append("_").append(regionId).append("_").append(isSuccess).append("_"); + if (traceBeans != null && traceBeans.size() > 0) { + for (TrackTraceBean bean : traceBeans) { + sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_"); + } + } + return "TrackTraceContext{" + sb.toString() + '}'; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java new file mode 100644 index 0000000000000000000000000000000000000000..3362106e6259b5dba22f6dc60740cf814223bad3 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.common; + +import org.apache.rocketmq.common.message.MessageType; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.rocketmq.client.trace.core.common.TrackTraceType.Pub; + +/** + * encode/decode for Track Trace Data + */ +public class TrackTraceDataEncoder { + + /** + * resolving traceContext list From track trace data String + * + * @param traceData + * @return + */ + public static List decoderFromTraceDataString(String traceData) { + List resList = new ArrayList(); + if (traceData == null || traceData.length() <= 0) { + return resList; + } + String[] contextList = traceData.split(String.valueOf(TrackTraceConstants.FIELD_SPLITOR)); + for (String context : contextList) { + String[] line = context.split(String.valueOf(TrackTraceConstants.CONTENT_SPLITOR)); + if (line[0].equals(Pub.name())) { + TrackTraceContext pubContext = new TrackTraceContext(); + pubContext.setTraceType(Pub); + pubContext.setTimeStamp(Long.parseLong(line[1])); + pubContext.setRegionId(line[2]); + pubContext.setGroupName(line[3]); + TrackTraceBean bean = new TrackTraceBean(); + bean.setTopic(line[4]); + bean.setMsgId(line[5]); + bean.setTags(line[6]); + bean.setKeys(line[7]); + bean.setStoreHost(line[8]); + bean.setBodyLength(Integer.parseInt(line[9])); + pubContext.setCostTime(Integer.parseInt(line[10])); + bean.setMsgType(MessageType.values()[Integer.parseInt(line[11])]); + + if (line.length == 13) { + pubContext.setSuccess(Boolean.parseBoolean(line[12])); + } else if (line.length == 14) { + bean.setOffsetMsgId(line[12]); + pubContext.setSuccess(Boolean.parseBoolean(line[13])); + } + pubContext.setTraceBeans(new ArrayList(1)); + pubContext.getTraceBeans().add(bean); + resList.add(pubContext); + } else if (line[0].equals(TrackTraceType.SubBefore.name())) { + TrackTraceContext subBeforeContext = new TrackTraceContext(); + subBeforeContext.setTraceType(TrackTraceType.SubBefore); + subBeforeContext.setTimeStamp(Long.parseLong(line[1])); + subBeforeContext.setRegionId(line[2]); + subBeforeContext.setGroupName(line[3]); + subBeforeContext.setRequestId(line[4]); + TrackTraceBean bean = new TrackTraceBean(); + bean.setMsgId(line[5]); + bean.setRetryTimes(Integer.parseInt(line[6])); + bean.setKeys(line[7]); + subBeforeContext.setTraceBeans(new ArrayList(1)); + subBeforeContext.getTraceBeans().add(bean); + resList.add(subBeforeContext); + } else if (line[0].equals(TrackTraceType.SubAfter.name())) { + TrackTraceContext subAfterContext = new TrackTraceContext(); + subAfterContext.setTraceType(TrackTraceType.SubAfter); + subAfterContext.setRequestId(line[1]); + TrackTraceBean bean = new TrackTraceBean(); + bean.setMsgId(line[2]); + bean.setKeys(line[5]); + subAfterContext.setTraceBeans(new ArrayList(1)); + subAfterContext.getTraceBeans().add(bean); + subAfterContext.setCostTime(Integer.parseInt(line[3])); + subAfterContext.setSuccess(Boolean.parseBoolean(line[4])); + if (line.length >= 7) { + // add the context type + subAfterContext.setContextCode(Integer.parseInt(line[6])); + } + resList.add(subAfterContext); + } + } + return resList; + } + + /** + * Encoding the trace context into track data strings and keyset sets + * + * @param ctx + * @return + */ + public static TrackTraceTransferBean encoderFromContextBean(TrackTraceContext ctx) { + if (ctx == null) { + return null; + } + //build message track trace of the transfering entity content bean + TrackTraceTransferBean transferBean = new TrackTraceTransferBean(); + StringBuilder sb = new StringBuilder(256); + switch (ctx.getTraceType()) { + case Pub: { + TrackTraceBean bean = ctx.getTraceBeans().get(0); + //append the content of context and traceBean to transferBean's TransData + sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getTopic()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getTags()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getStoreHost()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getBodyLength()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgType().ordinal()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getOffsetMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.isSuccess()).append(TrackTraceConstants.FIELD_SPLITOR); + } + break; + case SubBefore: { + for (TrackTraceBean bean : ctx.getTraceBeans()) { + sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getRetryTimes()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(TrackTraceConstants.FIELD_SPLITOR);// + } + } + break; + case SubAfter: { + for (TrackTraceBean bean : ctx.getTraceBeans()) { + sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.isSuccess()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getContextCode()).append(TrackTraceConstants.FIELD_SPLITOR); + } + } + break; + default: + } + transferBean.setTransData(sb.toString()); + for (TrackTraceBean bean : ctx.getTraceBeans()) { + + transferBean.getTransKey().add(bean.getMsgId()); + if (bean.getKeys() != null && bean.getKeys().length() > 0) { + transferBean.getTransKey().add(bean.getKeys()); + } + } + return transferBean; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java new file mode 100644 index 0000000000000000000000000000000000000000..a22198e81bd48f2cf60c7c950898930c1628fe5e --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.common; + +public enum TrackTraceDispatcherType { + PRODUCER, + CONSUMER +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java new file mode 100644 index 0000000000000000000000000000000000000000..9535b5d472a55975aa79b4c107e9d88cc4f6ff54 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.common; + +import java.util.HashSet; +import java.util.Set; + +/** + * track trace transfering bean + */ +public class TrackTraceTransferBean { + private String transData; + private Set transKey = new HashSet(); + + public String getTransData() { + return transData; + } + + public void setTransData(String transData) { + this.transData = transData; + } + + public Set getTransKey() { + return transKey; + } + + public void setTransKey(Set transKey) { + this.transKey = transKey; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java new file mode 100644 index 0000000000000000000000000000000000000000..31a89ef655f20afec1e0c05a0ec4df567f04cac4 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.common; + +/** + * Created by zongtanghu on 2018/11/6. + */ +public enum TrackTraceType { + Pub, + SubBefore, + SubAfter, +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java new file mode 100644 index 0000000000000000000000000000000000000000..9b282bb0b1bfd3491f206301b2140a2340333e01 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.dispatch; + +import java.util.Properties; +import org.apache.rocketmq.client.exception.MQClientException; +import java.io.IOException; + +/** + * Interface of asynchronous transfer data + */ +public interface AsyncDispatcher { + + /** + * Initialize asynchronous transfer data module + */ + void start(Properties properties) throws MQClientException; + + /** + * append the transfering data + * @param ctx data infomation + * @return + */ + boolean append(Object ctx); + + /** + * write flush action + * + * @throws IOException + */ + void flush() throws IOException; + + /** + * close the track trace Hook + */ + void shutdown(); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java new file mode 100644 index 0000000000000000000000000000000000000000..1c1c4c3813fbef7c5b84a1f9fd29e2ab18752f12 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.dispatch.impl; + +import org.apache.rocketmq.client.common.ThreadLocalIndex; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; +import org.apache.rocketmq.client.trace.core.common.TrackTraceContext; +import org.apache.rocketmq.client.trace.core.common.TrackTraceDataEncoder; +import org.apache.rocketmq.client.trace.core.common.TrackTraceTransferBean; +import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; + +import java.io.IOException; +import java.util.List; +import java.util.HashMap; +import java.util.UUID; +import java.util.Properties; +import java.util.Set; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Created by zongtanghu on 2018/11/6. + */ +public class AsyncArrayDispatcher implements AsyncDispatcher { + + private final static InternalLogger log = ClientLogger.getLog(); + private final int queueSize; + private final int batchSize; + private final DefaultMQProducer traceProducer; + private final ThreadPoolExecutor traceExecuter; + // the last discard number of log + private AtomicLong discardCount; + private Thread worker; + private ArrayBlockingQueue traceContextQueue; + private ArrayBlockingQueue appenderQueue; + private volatile Thread shutDownHook; + private volatile boolean stopped = false; + private String dispatcherType; + private DefaultMQProducerImpl hostProducer; + private DefaultMQPushConsumerImpl hostConsumer; + private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); + private String dispatcherId = UUID.randomUUID().toString(); + + public AsyncArrayDispatcher(Properties properties) throws MQClientException { + dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE); + int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048")); + // queueSize is greater than or equal to the n power of 2 of value + queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); + this.queueSize = queueSize; + batchSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_BATCH_NUM, "1")); + this.discardCount = new AtomicLong(0L); + traceContextQueue = new ArrayBlockingQueue(1024); + appenderQueue = new ArrayBlockingQueue(queueSize); + + this.traceExecuter = new ThreadPoolExecutor(// + 10, // + 20, // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.appenderQueue, // + new ThreadFactoryImpl("MQTraceSendThread_")); + traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties); + } + + public DefaultMQProducer getTraceProducer() { + return traceProducer; + } + + public DefaultMQProducerImpl getHostProducer() { + return hostProducer; + } + + public void setHostProducer(DefaultMQProducerImpl hostProducer) { + this.hostProducer = hostProducer; + } + + public DefaultMQPushConsumerImpl getHostConsumer() { + return hostConsumer; + } + + public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) { + this.hostConsumer = hostConsumer; + } + + public void start(Properties properties) throws MQClientException { + TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId,properties.getProperty(TrackTraceConstants.NAMESRV_ADDR)); + this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId); + this.worker.setDaemon(true); + this.worker.start(); + this.registerShutDownHook(); + } + + @Override + public boolean append(final Object ctx) { + boolean result = traceContextQueue.offer((TrackTraceContext) ctx); + if (!result) { + log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); + } + return result; + } + + @Override + public void flush() throws IOException { + // the maximum waiting time for refresh,avoid being written all the time, resulting in failure to return. + long end = System.currentTimeMillis() + 500; + while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + break; + } + } + log.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size()); + } + + @Override + public void shutdown() { + this.stopped = true; + this.traceExecuter.shutdown(); + TrackTraceProducerFactory.unregisterTraceDispatcher(dispatcherId); + this.removeShutdownHook(); + } + + public void registerShutDownHook() { + if (shutDownHook == null) { + shutDownHook = new Thread(new Runnable() { + private volatile boolean hasShutdown = false; + + @Override + public void run() { + synchronized (this) { + if (!this.hasShutdown) { + try { + flush(); + } catch (IOException e) { + log.error("system MQTrace hook shutdown failed ,maybe loss some trace data"); + } + } + } + } + }, "ShutdownHookMQTrace"); + Runtime.getRuntime().addShutdownHook(shutDownHook); + } + } + + public void removeShutdownHook() { + if (shutDownHook != null) { + Runtime.getRuntime().removeShutdownHook(shutDownHook); + } + } + + class AsyncRunnable implements Runnable { + private boolean stopped; + + @Override + public void run() { + while (!stopped) { + List contexts = new ArrayList(batchSize); + for (int i = 0; i < batchSize; i++) { + TrackTraceContext context = null; + try { + //get track trace data element from blocking Queue — traceContextQueue + context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + if (context != null) { + contexts.add(context); + } else { + break; + } + } + if (contexts.size() > 0) { + AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); + traceExecuter.submit(request); + } else if (AsyncArrayDispatcher.this.stopped) { + this.stopped = true; + } + } + + } + } + + class AsyncAppenderRequest implements Runnable { + List contextList; + + public AsyncAppenderRequest(final List contextList) { + if (contextList != null) { + this.contextList = contextList; + } else { + this.contextList = new ArrayList(1); + } + } + + @Override + public void run() { + sendTraceData(contextList); + } + + public void sendTraceData(List contextList) { + Map> transBeanMap = new HashMap>(); + for (TrackTraceContext context : contextList) { + if (context.getTraceBeans().isEmpty()) { + continue; + } + //1.topic value corresponding to original message entity content + String topic = context.getTraceBeans().get(0).getTopic(); + //2.use original message entity's topic as key + String key = topic; + List transBeanList = transBeanMap.get(key); + if (transBeanList == null) { + transBeanList = new ArrayList(); + transBeanMap.put(key, transBeanList); + } + TrackTraceTransferBean traceData = TrackTraceDataEncoder.encoderFromContextBean(context); + transBeanList.add(traceData); + } + for (Map.Entry> entry : transBeanMap.entrySet()) { + //key -> dataTopic(Not trace Topic) + String dataTopic = entry.getKey(); + flushData(entry.getValue(), dataTopic); + } + } + + /** + * batch sending data actually + */ + private void flushData(List transBeanList, String topic) { + if (transBeanList.size() == 0) { + return; + } + // temporary buffer + StringBuilder buffer = new StringBuilder(1024); + int count = 0; + Set keySet = new HashSet(); + + for (TrackTraceTransferBean bean : transBeanList) { + // keyset of message track trace includes msgId of or original message + keySet.addAll(bean.getTransKey()); + buffer.append(bean.getTransData()); + count++; + // Ensure that the size of the package should not exceed the upper limit. + if (buffer.length() >= traceProducer.getMaxMessageSize()) { + sendTraceDataByMQ(keySet, buffer.toString()); + // clear temporary buffer after finishing + buffer.delete(0, buffer.length()); + keySet.clear(); + count = 0; + } + } + if (count > 0) { + sendTraceDataByMQ(keySet, buffer.toString()); + } + transBeanList.clear(); + } + + /** + * send message track trace data + * + * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) + * @param data the message track trace data in this batch + */ + private void sendTraceDataByMQ(Set keySet, final String data) { + String topic = TrackTraceConstants.TRACE_TOPIC; + final Message message = new Message(topic, data.getBytes()); + + //keyset of message track trace includes msgId of or original message + message.setKeys(keySet); + try { + Set traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic); + SendCallback callback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + } + + @Override + public void onException(Throwable e) { + log.info("send trace data ,the traceData is " + data); + } + }; + if (traceBrokerSet.isEmpty()) { + //no cross set + traceProducer.send(message, callback, 5000); + } else { + traceProducer.send(message, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Set brokerSet = (Set) arg; + List filterMqs = new ArrayList(); + for (MessageQueue queue : mqs) { + if (brokerSet.contains(queue.getBrokerName())) { + filterMqs.add(queue); + } + } + int index = sendWhichQueue.getAndIncrement(); + int pos = Math.abs(index) % filterMqs.size(); + if (pos < 0) { + pos = 0; + } + return filterMqs.get(pos); + } + }, traceBrokerSet, callback); + } + + } catch (Exception e) { + log.info("send trace data,the traceData is" + data); + } + } + + private Set tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) { + Set brokerSet = new HashSet(); + TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic); + if (null == topicPublishInfo || !topicPublishInfo.ok()) { + producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo()); + producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); + topicPublishInfo = producer.getTopicPublishInfoTable().get(topic); + } + if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { + for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) { + brokerSet.add(queue.getBrokerName()); + } + } + return brokerSet; + } + } + +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..27447df7340841f35906c1659f8895d58785cd06 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.dispatch.impl; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; +import org.apache.rocketmq.common.namesrv.TopAddressing; + +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TrackTraceProducerFactory { + + private static Map dispatcherTable = new ConcurrentHashMap(); + private static AtomicBoolean isStarted = new AtomicBoolean(false); + private static DefaultMQProducer traceProducer; + + + public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) { + if (traceProducer == null) { + + traceProducer = new DefaultMQProducer(); + traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME); + traceProducer.setSendMsgTimeout(5000); + traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis()))); + String nameSrv = properties.getProperty(TrackTraceConstants.NAMESRV_ADDR); + if (nameSrv == null) { + TopAddressing topAddressing = new TopAddressing(properties.getProperty(TrackTraceConstants.ADDRSRV_URL)); + nameSrv = topAddressing.fetchNSAddr(); + } + traceProducer.setNamesrvAddr(nameSrv); + traceProducer.setVipChannelEnabled(false); + //the max size of message is 128K + int maxSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_MSG_SIZE, "128000")); + traceProducer.setMaxMessageSize(maxSize - 10 * 1000); + } + return traceProducer; + } + + public static void registerTraceDispatcher(String dispatcherId, String nameSrvAddr) throws MQClientException { + dispatcherTable.put(dispatcherId, new Object()); + if (traceProducer != null && isStarted.compareAndSet(false, true)) { + traceProducer.setNamesrvAddr(nameSrvAddr); + traceProducer.start(); + } + } + + public static void unregisterTraceDispatcher(String dispatcherId) { + dispatcherTable.remove(dispatcherId); + if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get()) { + traceProducer.shutdown(); + } + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..df88ce2d57699c6322a5451513d729ddf0c3340f --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.hook; + +import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.client.trace.core.common.TrackTraceBean; +import org.apache.rocketmq.client.trace.core.common.TrackTraceContext; +import org.apache.rocketmq.client.trace.core.common.TrackTraceType; +import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.ArrayList; +import java.util.List; + +public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { + + private AsyncDispatcher localDispatcher; + + public ConsumeMessageTraceHookImpl(AsyncDispatcher localDispatcher) { + this.localDispatcher = localDispatcher; + } + + @Override + public String hookName() { + return "ConsumeMessageTraceHook"; + } + + @Override + public void consumeMessageBefore(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + TrackTraceContext traceContext = new TrackTraceContext(); + context.setMqTraceContext(traceContext); + traceContext.setTraceType(TrackTraceType.SubBefore);// + traceContext.setGroupName(context.getConsumerGroup());// + List beans = new ArrayList(); + for (MessageExt msg : context.getMsgList()) { + if (msg == null) { + continue; + } + String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION); + String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH); + + if (traceOn != null && traceOn.equals("false")) { + // if trace switch is false ,skip it + continue; + } + TrackTraceBean traceBean = new TrackTraceBean(); + traceBean.setTopic(msg.getTopic());// + traceBean.setMsgId(msg.getMsgId());// + traceBean.setTags(msg.getTags());// + traceBean.setKeys(msg.getKeys());// + traceBean.setStoreTime(msg.getStoreTimestamp());// + traceBean.setBodyLength(msg.getStoreSize());// + traceBean.setRetryTimes(msg.getReconsumeTimes());// + traceContext.setRegionId(regionId);// + beans.add(traceBean); + } + if (beans.size() > 0) { + traceContext.setTraceBeans(beans); + traceContext.setTimeStamp(System.currentTimeMillis()); + localDispatcher.append(traceContext); + } + } + + @Override + public void consumeMessageAfter(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + TrackTraceContext subBeforeContext = (TrackTraceContext) context.getMqTraceContext(); + + if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) { + // if subbefore bean is null ,skip it + return; + } + TrackTraceContext subAfterContext = new TrackTraceContext(); + subAfterContext.setTraceType(TrackTraceType.SubAfter);// + subAfterContext.setRegionId(subBeforeContext.getRegionId());// + subAfterContext.setGroupName(subBeforeContext.getGroupName());// + subAfterContext.setRequestId(subBeforeContext.getRequestId());// + subAfterContext.setSuccess(context.isSuccess());// + + //caculate the cost time for processing messages + int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size()); + subAfterContext.setCostTime(costTime);// + subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans()); + String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE); + if (contextType != null) { + subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal()); + } + localDispatcher.append(subAfterContext); + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..0c38e2203606fecc5b34a23a790a5c36643846c9 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.core.hook; + +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.trace.core.common.TrackTraceBean; +import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; +import org.apache.rocketmq.client.trace.core.common.TrackTraceContext; +import org.apache.rocketmq.client.trace.core.common.TrackTraceType; +import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; +import org.apache.rocketmq.common.MixAll; +import java.util.ArrayList; + +public class SendMessageTrackHookImpl implements SendMessageHook { + + private AsyncDispatcher localDispatcher; + + public SendMessageTrackHookImpl(AsyncDispatcher localDispatcher) { + this.localDispatcher = localDispatcher; + } + + @Override + public String hookName() { + return "SendMessageTrackHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + //if it is message track trace data,then it doesn't recorded + if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) { + return; + } + //build the context content of TuxeTraceContext + TrackTraceContext tuxeContext = new TrackTraceContext(); + tuxeContext.setTraceBeans(new ArrayList(1)); + context.setMqTraceContext(tuxeContext); + tuxeContext.setTraceType(TrackTraceType.Pub); + tuxeContext.setGroupName(context.getProducerGroup()); + + //build the data bean object of message track trace + TrackTraceBean traceBean = new TrackTraceBean(); + traceBean.setTopic(context.getMessage().getTopic()); + traceBean.setTags(context.getMessage().getTags()); + traceBean.setKeys(context.getMessage().getKeys()); + traceBean.setStoreHost(context.getBrokerAddr()); + traceBean.setBodyLength(context.getMessage().getBody().length); + traceBean.setMsgType(context.getMsgType()); + tuxeContext.getTraceBeans().add(traceBean); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + //if it is message track trace data,then it doesn't recorded + if (context == null || context.getMessage().getTopic().startsWith(TrackTraceConstants.TRACE_TOPIC) + || context.getMqTraceContext() == null) { + return; + } + if (context.getSendResult() == null) { + return; + } + + if (context.getSendResult().getRegionId() == null + || !context.getSendResult().isTraceOn()) { + // if switch is false,skip it + return; + } + + TrackTraceContext tuxeContext = (TrackTraceContext) context.getMqTraceContext(); + TrackTraceBean traceBean = tuxeContext.getTraceBeans().get(0); + int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); + tuxeContext.setCostTime(costTime); + if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { + tuxeContext.setSuccess(true); + } else { + tuxeContext.setSuccess(false); + } + tuxeContext.setRegionId(context.getSendResult().getRegionId()); + traceBean.setMsgId(context.getSendResult().getMsgId()); + traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); + traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2); + localDispatcher.append(tuxeContext); + } +} diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index ff2fb78bbfb9df89596abc2ad7b35fcc33a2df3e..019b44da4fe68f40a9f5f3ac8676121c33e2ab03 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -89,7 +89,7 @@ public class DefaultMQPushConsumerTest { @Before public void init() throws Exception { consumerGroup = "FooBarGroup" + System.currentTimeMillis(); - pushConsumer = new DefaultMQPushConsumer(consumerGroup); + pushConsumer = new DefaultMQPushConsumer(consumerGroup,false); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); @@ -252,7 +252,7 @@ public class DefaultMQPushConsumerTest { } private DefaultMQPushConsumer createPushConsumer() { - DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup); + DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup,false); pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java index d4f581231f0d4d25eb7a403f9200911ce90549da..e6f26f20e8fe56e1da7f968ce4ce78555e246d64 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java @@ -42,7 +42,7 @@ public class DefaultMQPushConsumerImplTest { //test message thrown.expectMessage("consumeThreadMin (10) is larger than consumeThreadMax (9)"); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group",false); consumer.setConsumeThreadMin(10); consumer.setConsumeThreadMax(9); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java index 796a3943087247c9e5fc75aa5b8fc1cc47c35fb0..f6ca3aca9f77d4b4b7b547b2a185ff3f5d95a5ed 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java @@ -44,7 +44,7 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class RebalancePushImplTest { @Spy - private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest"), null); + private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest",false), null); @Mock private MQClientInstance mqClientInstance; @Mock diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index c225afd6842b24adc49818575d54a2d5ca5e016d..391743614f57e21b715e998d09ac7e241768f2a2 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -82,7 +82,7 @@ public class DefaultMQProducerTest { @Before public void init() throws Exception { String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); - producer = new DefaultMQProducer(producerGroupTemp); + producer = new DefaultMQProducer(producerGroupTemp,false); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setCompressMsgBodyOverHowmuch(16); message = new Message(topic, new byte[] {'a'}); @@ -309,7 +309,7 @@ public class DefaultMQProducerTest { @Test public void testSetCallbackExecutor() throws MQClientException { String producerGroupTemp = "testSetCallbackExecutor_" + System.currentTimeMillis(); - producer = new DefaultMQProducer(producerGroupTemp); + producer = new DefaultMQProducer(producerGroupTemp,false); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..7d8fb0a6ae6c860d7657b14040b153b96a0f64dd --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.trace; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import net.bytebuddy.asm.Advice; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultMQProducerWithTraceTest { + + @Spy + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + @Mock + private MQClientAPIImpl mQClientAPIImpl; + + @Spy + private MQClientInstance mQClientTraceFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + @Mock + private MQClientAPIImpl mQClientTraceAPIImpl; + + private AsyncArrayDispatcher asyncArrayDispatcher; + + private DefaultMQProducer producer; + private DefaultMQProducer traceProducer; + + private Message message; + private String topic = "FooBar"; + private String producerGroupPrefix = "FooBar_PID"; + private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis(); + + @Before + public void init() throws Exception { + + producer = new DefaultMQProducer(producerGroupTemp,true); + producer.setNamesrvAddr("127.0.0.1:9876"); + message = new Message(topic, new byte[] {'a', 'b' ,'c'}); + asyncArrayDispatcher = (AsyncArrayDispatcher)producer.getTraceDispatcher(); + traceProducer = asyncArrayDispatcher.getTraceProducer(); + + producer.start(); + + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(producer.getDefaultMQProducerImpl(), mQClientFactory); + + Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + fieldTrace.setAccessible(true); + field.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientTraceFactory, mQClientTraceAPIImpl); + + producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); + + when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), + nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); + when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), + nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) + .thenReturn(createSendResult(SendStatus.SEND_OK)); + + } + + @Test + public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute()); + final CountDownLatch countDownLatch = new CountDownLatch(1); + try { + producer.send(message); + }catch (MQClientException e){ + } + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + + } + + @Test + public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute()); + final CountDownLatch countDownLatch = new CountDownLatch(1); + try { + producer.send(message); + }catch (MQClientException e){ + } + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + + } + + @After + public void terminate() { + producer.shutdown(); + } + + public static TopicRouteData createTopicRoute() { + TopicRouteData topicRouteData = new TopicRouteData(); + + topicRouteData.setFilterServerTable(new HashMap>()); + List brokerDataList = new ArrayList(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("BrokerA"); + brokerData.setCluster("DefaultCluster"); + HashMap brokerAddrs = new HashMap(); + brokerAddrs.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + topicRouteData.setBrokerDatas(brokerDataList); + + List queueDataList = new ArrayList(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("BrokerA"); + queueData.setPerm(6); + queueData.setReadQueueNums(3); + queueData.setWriteQueueNums(4); + queueData.setTopicSynFlag(0); + queueDataList.add(queueData); + topicRouteData.setQueueDatas(queueDataList); + return topicRouteData; + } + + private SendResult createSendResult(SendStatus sendStatus) { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("123"); + sendResult.setOffsetMsgId("123"); + sendResult.setQueueOffset(456); + sendResult.setSendStatus(sendStatus); + sendResult.setRegionId("HZ"); + return sendResult; + } + + public static TopicRouteData createTraceTopicRoute() { + TopicRouteData topicRouteData = new TopicRouteData(); + + topicRouteData.setFilterServerTable(new HashMap>()); + List brokerDataList = new ArrayList(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("broker-trace"); + brokerData.setCluster("DefaultCluster"); + HashMap brokerAddrs = new HashMap(); + brokerAddrs.put(0L, "127.0.0.1:10912"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + topicRouteData.setBrokerDatas(brokerDataList); + + List queueDataList = new ArrayList(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("broker-trace"); + queueData.setPerm(6); + queueData.setReadQueueNums(1); + queueData.setWriteQueueNums(1); + queueData.setTopicSynFlag(1); + queueDataList.add(queueData); + topicRouteData.setQueueDatas(queueDataList); + return topicRouteData; + } + + private SendResult createSendTraceResult(SendStatus sendStatus) { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("456"); + sendResult.setOffsetMsgId("456"); + sendResult.setQueueOffset(789); + sendResult.setSendStatus(sendStatus); + sendResult.setRegionId("HZ"); + return sendResult; + } + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index f81af21651581b9a442c2eef24174ac970f7b4a3..77d492ecbb38761371ef7ae630f53e804d133c4b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -51,7 +51,8 @@ public class BrokerConfig { @ImportantField private boolean autoCreateSubscriptionGroup = true; private String messageStorePlugIn = ""; - + @ImportantField + private boolean autoTraceBrokerEnable = false; /** * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default * value is 1. @@ -732,4 +733,12 @@ public class BrokerConfig { public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) { this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue; } + + public boolean isAutoTraceBrokerEnable() { + return autoTraceBrokerEnable; + } + + public void setAutoTraceBrokerEnable(boolean autoTraceBrokerEnable) { + this.autoTraceBrokerEnable = autoTraceBrokerEnable; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 20d186764ebccd87e148f4ca80ce2d0a6876b234..0573c762abeecfd61c65504604b83e5f847614ec 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -82,7 +82,8 @@ public class MixAll { public static final long CURRENT_JVM_PID = getPID(); public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; - + public static final String TRACE_BROKER_NAME_SUFFIX = "trace"; + public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_"; public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY"; @@ -90,6 +91,7 @@ public class MixAll { public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; + public static final String RMQ_SYS_TRACK_TRACE_TOPIC = "RMQ_SYS_TRACK_TRACE_TOPIC"; public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; diff --git a/distribution/conf/2m-noslave/broker-a.properties b/distribution/conf/2m-noslave/broker-a.properties index b704b54c54c72349238ec0f00bea1f5b4c9b7d20..cd051cdc138f52a8101d526f255c105be67330eb 100644 --- a/distribution/conf/2m-noslave/broker-a.properties +++ b/distribution/conf/2m-noslave/broker-a.properties @@ -19,3 +19,4 @@ deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH +autoTraceBrokerEnable=false \ No newline at end of file diff --git a/distribution/conf/2m-noslave/broker-b.properties b/distribution/conf/2m-noslave/broker-b.properties index 130671a7c319eaf8d3b98e62845bedbb3296a291..da00cd41901979af75fb1faaf9a0a362b1574783 100644 --- a/distribution/conf/2m-noslave/broker-b.properties +++ b/distribution/conf/2m-noslave/broker-b.properties @@ -19,3 +19,4 @@ deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH +autoTraceBrokerEnable=false \ No newline at end of file diff --git a/distribution/conf/2m-noslave/broker-trace.properties b/distribution/conf/2m-noslave/broker-trace.properties new file mode 100644 index 0000000000000000000000000000000000000000..a4898aae0c1e4b6b2e3e5648bc33aa4f4c3ec3f8 --- /dev/null +++ b/distribution/conf/2m-noslave/broker-trace.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +brokerClusterName=DefaultCluster +brokerName=broker-trace +brokerId=0 +deleteWhen=04 +fileReservedTime=48 +brokerRole=ASYNC_MASTER +flushDiskType=ASYNC_FLUSH +autoTraceBrokerEnable=true \ No newline at end of file diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java index cf566aa15bd0d58f629ff879a96921acd71c9ad0..e1372ccd7e31068df217cd8849230a8f98ca8528 100644 --- a/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.common.message.Message; public class SimpleBatchProducer { public static void main(String[] args) throws Exception { - DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); + DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName",false); producer.start(); //If you just send messages of no more than 1MiB at a time, it is easy to use batch diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java index f9495c4171481bc324f10bd36f00cb9b756de4c4..eae0800812de037f8ebce0fa9fe00dd85e23d87a 100644 --- a/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java @@ -28,7 +28,7 @@ public class SplitBatchProducer { public static void main(String[] args) throws Exception { - DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); + DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName",false); producer.start(); //large batch diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index d431d3ecc6ad6c05dae4ebe8b7796d930c4905fb..4b6edf0537b11bcb7fd302a7c1be55ccedc60e2c 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -102,7 +102,7 @@ public class Consumer { } }, 10000, 10000); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group,false); consumer.setInstanceName(Long.toString(System.currentTimeMillis())); if (filterType == null || expression == null) { diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index ce2b83f97755c51a3fabda1f22ef13d276245929..16ad133f4980a29c328efbdb5b05dd667e1ea31c 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -100,7 +100,7 @@ public class Producer { } }, 10000, 10000); - final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer"); + final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer",false); producer.setInstanceName(Long.toString(System.currentTimeMillis())); if (commandLine.hasOption('n')) { diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java index fb1f9bbde7110fe0b9a500e771cf70a8fe4f0c95..f1a55c6d61cda836fa2d04c2a99228922e61d9fe 100644 --- a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java @@ -29,7 +29,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1",false); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java index bb491ac40af5a55b9667a4d4fb72c023ce91156c..262f57a9d25d55a90b230b6eb641dc0e2655d2e3 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java @@ -30,7 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4",false); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile()); diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java index 2a0da6546c853ee569e786062923f646b0de59d8..2d575f99fdd244f3016c7fd3a4e0fbc007c1bf62 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",false); producer.start(); try { diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java index c41c9c14c3c15b5a6e67f0c689cbc2bf1206c80a..43409c67def62b1368560925552d03e2022ddf67 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java @@ -30,7 +30,7 @@ import java.util.List; public class SqlConsumer { public static void main(String[] args) { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4",false); try { consumer.subscribe("TopicTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java index 3f3a0e65215b0d1f53720c26a25f9f53bdeac68b..4f319f5c3e6acc9f6d0d8c47ee12d3f384961819 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class SqlProducer { public static void main(String[] args) { - DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name",false); try { producer.start(); } catch (MQClientException e) { diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java index 6936f1dcea374ad281e5ae7e34f0e0120d055399..c536a14c40b99650174af1c3fd10160527677a03 100644 --- a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java @@ -41,7 +41,7 @@ public class Consumer { String subscription = commandLine.getOptionValue('s'); final String returnFailedHalf = commandLine.getOptionValue('f'); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group,false); consumer.setInstanceName(Long.toString(System.currentTimeMillis())); consumer.subscribe(topic, subscription); diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java index 1d4336d7fa9b5cdeb14e271dffac092ff53ebe68..20fed39b9f952bbbd927d453391c42410d91ccfd 100644 --- a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java @@ -39,7 +39,7 @@ public class Producer { String keys = commandLine.getOptionValue('k'); String msgCount = commandLine.getOptionValue('c'); - DefaultMQProducer producer = new DefaultMQProducer(group); + DefaultMQProducer producer = new DefaultMQProducer(group,false); producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.start(); diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java index abb274d6129278cb6e045ef503e7cdc88a79f912..d7b6cd9591f7d2fd53d08a64aadd25b820f229b0 100644 --- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java @@ -29,7 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3",false); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java index 6a6bdc7d710e750f5738ff9a705e3680967e4e24..c1e26910a214923655ba81d9ce01ce0dfef2c7ca 100644 --- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java @@ -32,7 +32,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; public class Producer { public static void main(String[] args) throws UnsupportedEncodingException { try { - MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name",false); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index 6d3b936507e9c667fdc816207f81d0f9502ecae2..da70a78ae841fc07b67143bcc772c705bd257032 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -35,7 +35,7 @@ public class Consumer { /* * Instantiate with specified consumer group name. */ - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4",false); /* * Specify name server addresses. diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index 53a1d4dd64ac027d52389520c05b7cad08228120..3d2d1c6e9ba9312ce15331ff73c35fc7551c06d6 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -31,7 +31,7 @@ public class Producer { /* * Instantiate with a producer group name. */ - DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name",false); /* * Specify name server addresses. diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java index d40739c813d4ad2a7925a76d2f3b6164e025fb8e..14ed57ff9330d0c97c2abac440522e38f78fc536 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java @@ -30,7 +30,7 @@ public class AsyncProducer { public static void main( String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { - DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); + DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test",false); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java index 7b504dd2a3e07595abbeb1043e9b6c28b7d6345e..5601e667757873b2cd701efc61492da61fbdb122 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java @@ -25,8 +25,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); - + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); producer.start(); for (int i = 0; i < 128; i++) diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java index c6c7e39d174f849d5d98f5a448c880d3cf2caf82..16271bb4bc765379931a6d90e837e0e19a66cab1 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java @@ -28,11 +28,11 @@ import org.apache.rocketmq.common.message.MessageExt; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); - consumer.subscribe("Jodie_topic_1023", "*"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true); + consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 - consumer.setConsumeTimestamp("20170422221800"); + consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java index 576f9cb6fbf23728ae4040cb5c8ea6c22fef2cee..f16953e34cb0c12cb18aeae5ccb94bd9c83a42d6 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class TestProducer { public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",false); producer.start(); for (int i = 0; i < 1; i++) diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java index d2adac53dbc6d5db1c1951883f42c710fd755fd5..7ba580df978b45c5998f5bc4a17a0bd069bbcd5a 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java @@ -63,7 +63,7 @@ public class ProducerInstance { return p; } - DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group); + DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group,false); defaultMQProducer.setNamesrvAddr(nameServerAddress); MQProducer beforeProducer = null; beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer); diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java index 38904c0bfc7733d50b8e100b54fe10c597af2b55..1289393850f225a056b04c2667ea643f017704c2 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java @@ -39,7 +39,7 @@ public class AbstractTestCase { @Before public void mockLoggerAppender() throws Exception { - DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender")); + DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender",false)); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocationOnMock) throws Throwable { diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java index ce739be591452f733f7f8def22898dd8a29a7834..a828e8ddf3ea3224b84afcbb97b2bb877b8ff015 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java @@ -46,7 +46,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer { } public void create(boolean useTLS) { - consumer = new DefaultMQPushConsumer(consumerGroup); + consumer = new DefaultMQPushConsumer(consumerGroup,false); consumer.setInstanceName(RandomUtil.getStringByUUID()); consumer.setNamesrvAddr(nsAddr); try { diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java index 66767cc9f0d682da8112ff2c989186b745aecd3c..abd1e3b64e6e6be1d5b9a9ebf5d5dd3c7e75d118 100644 --- a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.test.util.RandomUtil; public class ProducerFactory { public static DefaultMQProducer getRMQProducer(String ns) { - DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID()); + DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID(),false); producer.setNamesrvAddr(ns); try { producer.start(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java index 69d3b24ca737b5b0c687281614addb63f9127554..4296a53ad059891d53f75e4f6ff4d36dd88b65c4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java @@ -71,7 +71,7 @@ public class SendMsgStatusCommand implements SubCommand { @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { - final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook); + final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook,false); producer.setInstanceName("PID_SMSC_" + System.currentTimeMillis()); try { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java index 6bb8caad40ad5e4a89816f7f7c5f5a6003223b77..675fc2a812c52c350fe75056bb7d353d494d9c88 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java @@ -213,7 +213,7 @@ public class QueryMsgByIdSubCommand implements SubCommand { public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById"); + DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById",false); defaultMQProducer.setInstanceName(Long.toString(System.currentTimeMillis())); try { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java index 9bf09ad410754be36bb2cb75e01761d83a02288c..3debb3d15c7a268ca423caeedaf623c06c861040 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java @@ -65,7 +65,7 @@ public class MonitorService { private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( MixAll.TOOLS_CONSUMER_GROUP); private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer( - MixAll.MONITOR_CONSUMER_GROUP); + MixAll.MONITOR_CONSUMER_GROUP,false); public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook rpcHook) { this.monitorConfig = monitorConfig;