提交 eeb14070 编写于 作者: S superheizai 提交者: Zhendong Liu

fix bug: when producers send msg to multi clusters, only one cluster can...

fix bug: when producers send msg to multi clusters, only one cluster can receive message trace message (#694)

* make isStarted as not static; rename  instanceName with nameSrv address as subfix to fix connect multi cluster

* fix checkstyle error

* remove useless mock in DefaultMQProducerWithTraceTest
上级 e7fcb284
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
...@@ -33,20 +32,22 @@ import org.apache.rocketmq.common.UtilAll; ...@@ -33,20 +32,22 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.HashMap;
import java.util.UUID;
import java.util.Set;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.remoting.RPCHook; import java.util.UUID;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME; import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
...@@ -70,7 +71,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -70,7 +71,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString(); private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName; private String traceTopicName;
private static AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isStarted = new AtomicBoolean(false);
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
...@@ -87,12 +88,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -87,12 +88,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
} }
this.traceExecuter = new ThreadPoolExecutor(// this.traceExecuter = new ThreadPoolExecutor(//
10, // 10, //
20, // 20, //
1000 * 60, // 1000 * 60, //
TimeUnit.MILLISECONDS, // TimeUnit.MILLISECONDS, //
this.appenderQueue, // this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_")); new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook); traceProducer = getAndCreateTraceProducer(rpcHook);
} }
...@@ -103,11 +104,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -103,11 +104,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void setTraceTopicName(String traceTopicName) { public void setTraceTopicName(String traceTopicName) {
this.traceTopicName = traceTopicName; this.traceTopicName = traceTopicName;
} }
public DefaultMQProducer getTraceProducer() { public DefaultMQProducer getTraceProducer() {
return traceProducer; return traceProducer;
} }
public DefaultMQProducerImpl getHostProducer() { public DefaultMQProducerImpl getHostProducer() {
return hostProducer; return hostProducer;
} }
...@@ -127,6 +128,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -127,6 +128,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void start(String nameSrvAddr) throws MQClientException { public void start(String nameSrvAddr) throws MQClientException {
if (isStarted.compareAndSet(false, true)) { if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start(); traceProducer.start();
} }
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
...@@ -141,7 +143,6 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -141,7 +143,6 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
traceProducerInstance = new DefaultMQProducer(rpcHook); traceProducerInstance = new DefaultMQProducer(rpcHook);
traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setInstanceName(TRACE_INSTANCE_NAME);
traceProducerInstance.setVipChannelEnabled(false); traceProducerInstance.setVipChannelEnabled(false);
// The max size of message is 128K // The max size of message is 128K
traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
...@@ -256,7 +257,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -256,7 +257,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void run() { public void run() {
sendTraceData(contextList); sendTraceData(contextList);
} }
public void sendTraceData(List<TraceContext> contextList) { public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>(); Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
for (TraceContext context : contextList) { for (TraceContext context : contextList) {
......
...@@ -17,12 +17,6 @@ ...@@ -17,12 +17,6 @@
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -52,11 +46,14 @@ import org.mockito.Mock; ...@@ -52,11 +46,14 @@ import org.mockito.Mock;
import org.mockito.Spy; import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.any; import java.lang.reflect.Field;
import static org.mockito.ArgumentMatchers.anyInt; import java.util.ArrayList;
import static org.mockito.ArgumentMatchers.anyLong; import java.util.HashMap;
import static org.mockito.ArgumentMatchers.anyString; import java.util.List;
import static org.mockito.ArgumentMatchers.nullable; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
...@@ -67,11 +64,6 @@ public class DefaultMQProducerWithTraceTest { ...@@ -67,11 +64,6 @@ public class DefaultMQProducerWithTraceTest {
@Mock @Mock
private MQClientAPIImpl mQClientAPIImpl; private MQClientAPIImpl mQClientAPIImpl;
@Spy
private MQClientInstance mQClientTraceFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientTraceAPIImpl;
private AsyncTraceDispatcher asyncTraceDispatcher; private AsyncTraceDispatcher asyncTraceDispatcher;
private DefaultMQProducer producer; private DefaultMQProducer producer;
...@@ -89,56 +81,52 @@ public class DefaultMQProducerWithTraceTest { ...@@ -89,56 +81,52 @@ public class DefaultMQProducerWithTraceTest {
@Before @Before
public void init() throws Exception { public void init() throws Exception {
customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp,false, customerTraceTopic); customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp, false, customerTraceTopic);
normalProducer = new DefaultMQProducer(producerGroupTemp,false,""); normalProducer = new DefaultMQProducer(producerGroupTemp, false, "");
producer = new DefaultMQProducer(producerGroupTemp,true,""); producer = new DefaultMQProducer(producerGroupTemp, true, "");
producer.setNamesrvAddr("127.0.0.1:9876"); producer.setNamesrvAddr("127.0.0.1:9876");
normalProducer.setNamesrvAddr("127.0.0.1:9877"); normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878"); customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[] {'a', 'b' ,'c'}); message = new Message(topic, new byte[]{'a', 'b', 'c'});
asyncTraceDispatcher = (AsyncTraceDispatcher)producer.getTraceDispatcher(); asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
asyncTraceDispatcher.setTraceTopicName(customerTraceTopic); asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
asyncTraceDispatcher.getHostProducer(); asyncTraceDispatcher.getHostProducer();
asyncTraceDispatcher.getHostConsumer(); asyncTraceDispatcher.getHostConsumer();
traceProducer = asyncTraceDispatcher.getTraceProducer(); traceProducer = asyncTraceDispatcher.getTraceProducer();
producer.start(); producer.start();
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true); field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory); field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
fieldTrace.setAccessible(true); fieldTrace.setAccessible(true);
fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory); fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true); field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl); field.set(mQClientFactory, mQClientAPIImpl);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientTraceFactory, mQClientTraceAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK)); .thenReturn(createSendResult(SendStatus.SEND_OK));
} }
@Test @Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1); final CountDownLatch countDownLatch = new CountDownLatch(1);
try { try {
producer.send(message); producer.send(message);
}catch (MQClientException e){ } catch (MQClientException e) {
} }
countDownLatch.await(3000L, TimeUnit.MILLISECONDS); countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
...@@ -147,11 +135,10 @@ public class DefaultMQProducerWithTraceTest { ...@@ -147,11 +135,10 @@ public class DefaultMQProducerWithTraceTest {
@Test @Test
public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1); final CountDownLatch countDownLatch = new CountDownLatch(1);
try { try {
producer.send(message); producer.send(message);
}catch (MQClientException e){ } catch (MQClientException e) {
} }
countDownLatch.await(3000L, TimeUnit.MILLISECONDS); countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册