From eeb1407032ec383ca8d2610af8dd1f3d5dca4994 Mon Sep 17 00:00:00 2001 From: superheizai Date: Thu, 17 Jan 2019 15:24:31 +0800 Subject: [PATCH] fix bug: when producers send msg to multi clusters, only one cluster can receive message trace message (#694) * make isStarted as not static; rename instanceName with nameSrv address as subfix to fix connect multi cluster * fix checkstyle error * remove useless mock in DefaultMQProducerWithTraceTest --- .../client/trace/AsyncTraceDispatcher.java | 41 +++++++------- .../trace/DefaultMQProducerWithTraceTest.java | 55 +++++++------------ 2 files changed, 42 insertions(+), 54 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 55423244..87a795e4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.trace; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; @@ -33,20 +32,22 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.RPCHook; import java.io.IOException; -import java.util.List; -import java.util.HashMap; -import java.util.UUID; -import java.util.Set; -import java.util.HashSet; -import java.util.ArrayList; -import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.rocketmq.remoting.RPCHook; +import java.util.UUID; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; + import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME; @@ -70,7 +71,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; - private static AtomicBoolean isStarted = new AtomicBoolean(false); + private AtomicBoolean isStarted = new AtomicBoolean(false); public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { @@ -87,12 +88,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher { this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; } this.traceExecuter = new ThreadPoolExecutor(// - 10, // - 20, // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.appenderQueue, // - new ThreadFactoryImpl("MQTraceSendThread_")); + 10, // + 20, // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.appenderQueue, // + new ThreadFactoryImpl("MQTraceSendThread_")); traceProducer = getAndCreateTraceProducer(rpcHook); } @@ -103,11 +104,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher { public void setTraceTopicName(String traceTopicName) { this.traceTopicName = traceTopicName; } - + public DefaultMQProducer getTraceProducer() { return traceProducer; } - + public DefaultMQProducerImpl getHostProducer() { return hostProducer; } @@ -127,6 +128,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { public void start(String nameSrvAddr) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); + traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); @@ -141,7 +143,6 @@ public class AsyncTraceDispatcher implements TraceDispatcher { traceProducerInstance = new DefaultMQProducer(rpcHook); traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); traceProducerInstance.setSendMsgTimeout(5000); - traceProducerInstance.setInstanceName(TRACE_INSTANCE_NAME); traceProducerInstance.setVipChannelEnabled(false); // The max size of message is 128K traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); @@ -256,7 +257,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { public void run() { sendTraceData(contextList); } - + public void sendTraceData(List contextList) { Map> transBeanMap = new HashMap>(); for (TraceContext context : contextList) { diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 6dcceeb5..903be01c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -17,12 +17,6 @@ package org.apache.rocketmq.client.trace; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -52,11 +46,14 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.nullable; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -67,11 +64,6 @@ public class DefaultMQProducerWithTraceTest { @Mock private MQClientAPIImpl mQClientAPIImpl; - @Spy - private MQClientInstance mQClientTraceFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); - @Mock - private MQClientAPIImpl mQClientTraceAPIImpl; - private AsyncTraceDispatcher asyncTraceDispatcher; private DefaultMQProducer producer; @@ -89,56 +81,52 @@ public class DefaultMQProducerWithTraceTest { @Before public void init() throws Exception { - customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp,false, customerTraceTopic); - normalProducer = new DefaultMQProducer(producerGroupTemp,false,""); - producer = new DefaultMQProducer(producerGroupTemp,true,""); + customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp, false, customerTraceTopic); + normalProducer = new DefaultMQProducer(producerGroupTemp, false, ""); + producer = new DefaultMQProducer(producerGroupTemp, true, ""); producer.setNamesrvAddr("127.0.0.1:9876"); normalProducer.setNamesrvAddr("127.0.0.1:9877"); customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878"); - message = new Message(topic, new byte[] {'a', 'b' ,'c'}); - asyncTraceDispatcher = (AsyncTraceDispatcher)producer.getTraceDispatcher(); + message = new Message(topic, new byte[]{'a', 'b', 'c'}); + asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); asyncTraceDispatcher.setTraceTopicName(customerTraceTopic); asyncTraceDispatcher.getHostProducer(); asyncTraceDispatcher.getHostConsumer(); traceProducer = asyncTraceDispatcher.getTraceProducer(); producer.start(); - + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); field.set(producer.getDefaultMQProducerImpl(), mQClientFactory); Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); fieldTrace.setAccessible(true); - fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory); + fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory); field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); field.setAccessible(true); field.set(mQClientFactory, mQClientAPIImpl); - field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); - field.setAccessible(true); - field.set(mQClientTraceFactory, mQClientTraceAPIImpl); producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); + nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenReturn(createSendResult(SendStatus.SEND_OK)); - + nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) + .thenReturn(createSendResult(SendStatus.SEND_OK)); + } @Test public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute()); final CountDownLatch countDownLatch = new CountDownLatch(1); try { producer.send(message); - }catch (MQClientException e){ + } catch (MQClientException e) { } countDownLatch.await(3000L, TimeUnit.MILLISECONDS); @@ -147,11 +135,10 @@ public class DefaultMQProducerWithTraceTest { @Test public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute()); final CountDownLatch countDownLatch = new CountDownLatch(1); try { producer.send(message); - }catch (MQClientException e){ + } catch (MQClientException e) { } countDownLatch.await(3000L, TimeUnit.MILLISECONDS); -- GitLab