From 47d88147526b4ab56559a177486b3dbd1e0cd9aa Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Fri, 17 May 2019 01:38:28 +0800 Subject: [PATCH] (1) Polish message trace target channel (2) Fix the issue that consume message with namespace trace cannot found --- .../apache/rocketmq/client/AccessChannel.java | 21 +++++++++++++++ .../apache/rocketmq/client/ClientConfig.java | 10 +++++++ .../consumer/DefaultMQPushConsumer.java | 2 +- .../client/producer/DefaultMQProducer.java | 13 +++++---- .../client/trace/AsyncTraceDispatcher.java | 27 +++++++++++++------ .../client/trace/TraceDispatcher.java | 3 ++- .../hook/ConsumeMessageTraceHookImpl.java | 7 ++--- .../trace/hook/SendMessageTraceHookImpl.java | 9 ++++--- 8 files changed, 70 insertions(+), 22 deletions(-) create mode 100644 client/src/main/java/org/apache/rocketmq/client/AccessChannel.java diff --git a/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java new file mode 100644 index 00000000..82978b0a --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client; +public enum AccessChannel { + local, + cloud, +} diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 6493f2d6..53ac3538 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -37,6 +37,8 @@ public class ClientConfig { private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); protected String namespace; + protected AccessChannel accessChannel = AccessChannel.local; + /** * Pulling topic information interval from the named server */ @@ -263,6 +265,14 @@ public class ClientConfig { this.namespace = namespace; } + public AccessChannel getAccessChannel() { + return this.accessChannel; + } + + public void setAccessChannel(AccessChannel accessChannel) { + this.accessChannel = accessChannel; + } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 44edfb68..339f799f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -693,7 +693,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { - traceDispatcher.start(this.getNamesrvAddr()); + traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index d5fbde03..b4acf8f1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -238,15 +238,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } /** - * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic name. + * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic + * name. * * @param namespace Namespace for this MQ Producer instance. * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. */ - public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { + public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, + boolean enableMsgTrace, final String customizedTraceTopic) { this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); @@ -282,7 +285,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.start(); if (null != traceDispatcher) { try { - traceDispatcher.start(this.getNamesrvAddr()); + traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } @@ -331,7 +334,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - Validators.checkMessage(msg,this); + Validators.checkMessage(msg, this); msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 0aaadb18..3b5fc1d0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; @@ -57,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private final int batchSize; private final int maxMsgSize; private final DefaultMQProducer traceProducer; - private final ThreadPoolExecutor traceExecuter; + private final ThreadPoolExecutor traceExecutor; // The last discard number of log private AtomicLong discardCount; private Thread worker; @@ -71,8 +72,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); + private AccessChannel accessChannel = AccessChannel.local; - public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { + public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; @@ -85,7 +87,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { } else { this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; } - this.traceExecuter = new ThreadPoolExecutor(// + this.traceExecutor = new ThreadPoolExecutor(// 10, // 20, // 1000 * 60, // @@ -95,6 +97,14 @@ public class AsyncTraceDispatcher implements TraceDispatcher { traceProducer = getAndCreateTraceProducer(rpcHook); } + public AccessChannel getAccessChannel() { + return accessChannel; + } + + public void setAccessChannel(AccessChannel accessChannel) { + this.accessChannel = accessChannel; + } + public String getTraceTopicName() { return traceTopicName; } @@ -123,12 +133,13 @@ public class AsyncTraceDispatcher implements TraceDispatcher { this.hostConsumer = hostConsumer; } - public void start(String nameSrvAddr) throws MQClientException { + public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } + this.accessChannel = accessChannel; this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); @@ -174,7 +185,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { @Override public void shutdown() { this.stopped = true; - this.traceExecuter.shutdown(); + this.traceExecutor.shutdown(); if (isStarted.get()) { traceProducer.shutdown(); } @@ -231,7 +242,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { } if (contexts.size() > 0) { AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); - traceExecuter.submit(request); + traceExecutor.submit(request); } else if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } @@ -330,11 +341,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher { */ private void sendTraceDataByMQ(Set keySet, final String data, String dataTopic, String regionId) { String traceTopic = traceTopicName; - if (StringUtils.isNotEmpty(regionId) && MixAll.RMQ_SYS_TRACE_TOPIC.equals(traceTopic)) { + if (AccessChannel.cloud == accessChannel){ traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId; } final Message message = new Message(traceTopic, data.getBytes()); - // Keyset of message trace includes msgId of or original message message.setKeys(keySet); try { @@ -342,6 +352,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { + } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java index 275e6a32..51cc0deb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.trace; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import java.io.IOException; @@ -27,7 +28,7 @@ public interface TraceDispatcher { /** * Initialize asynchronous transfer data module */ - void start(String nameSrvAddr) throws MQClientException; + void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException; /** * Append the transfering data diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java index 38ec8b97..f30b1211 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt; import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.common.protocol.NamespaceUtil; public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { @@ -51,7 +52,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { TraceContext traceContext = new TraceContext(); context.setMqTraceContext(traceContext); traceContext.setTraceType(TraceType.SubBefore);// - traceContext.setGroupName(context.getConsumerGroup());// + traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));// List beans = new ArrayList(); for (MessageExt msg : context.getMsgList()) { if (msg == null) { @@ -65,7 +66,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { continue; } TraceBean traceBean = new TraceBean(); - traceBean.setTopic(msg.getTopic());// + traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));// traceBean.setMsgId(msg.getMsgId());// traceBean.setTags(msg.getTags());// traceBean.setKeys(msg.getKeys());// @@ -96,7 +97,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { TraceContext subAfterContext = new TraceContext(); subAfterContext.setTraceType(TraceType.SubAfter);// subAfterContext.setRegionId(subBeforeContext.getRegionId());// - subAfterContext.setGroupName(subBeforeContext.getGroupName());// + subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));// subAfterContext.setRequestId(subBeforeContext.getRequestId());// subAfterContext.setSuccess(context.isSuccess());// diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java index 20396c6d..80c7babd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java @@ -16,15 +16,16 @@ */ package org.apache.rocketmq.client.trace.hook; +import java.util.ArrayList; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceBean; import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceDispatcher; -import org.apache.rocketmq.client.trace.TraceBean; import org.apache.rocketmq.client.trace.TraceType; -import java.util.ArrayList; +import org.apache.rocketmq.common.protocol.NamespaceUtil; public class SendMessageTraceHookImpl implements SendMessageHook { @@ -50,10 +51,10 @@ public class SendMessageTraceHookImpl implements SendMessageHook { tuxeContext.setTraceBeans(new ArrayList(1)); context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); - tuxeContext.setGroupName(context.getProducerGroup()); + tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup())); //build the data bean object of message trace TraceBean traceBean = new TraceBean(); - traceBean.setTopic(context.getMessage().getTopic()); + traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic())); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr()); -- GitLab