diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 5b7e635093ca06e245b66949dd9a7eab60150c4b..04b760eec2281561a70c7723208f10d1125d457f 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -20,8 +20,12 @@ package org.apache.rocketmq.client.consumer; import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.net.InetSocketAddress; -import java.util.*; - +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; @@ -46,17 +50,15 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.RPCHook; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -70,9 +72,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -@PrepareForTest(DefaultLitePullConsumerImpl.class) -@PowerMockIgnore("javax.management.*") +@RunWith(MockitoJUnitRunner.class) public class DefaultLitePullConsumerTest { @Spy private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @@ -94,7 +94,10 @@ public class DefaultLitePullConsumerTest { @Before public void init() throws Exception { - PowerMockito.suppress(PowerMockito.method(DefaultLitePullConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); + ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); + factoryTable.forEach((s, instance) -> instance.shutdown()); + factoryTable.clear(); + Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); field.setAccessible(true); RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); @@ -182,7 +185,9 @@ public class DefaultLitePullConsumerTest { when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L); when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L); MessageQueue messageQueue = createMessageQueue(); - litePullConsumer.assign(Collections.singletonList(messageQueue)); + List messageQueues = Collections.singletonList(messageQueue); + litePullConsumer.assign(messageQueues); + litePullConsumer.pause(messageQueues); long offset = litePullConsumer.committed(messageQueue); litePullConsumer.seek(messageQueue, offset); Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); @@ -198,7 +203,9 @@ public class DefaultLitePullConsumerTest { when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L); when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L); MessageQueue messageQueue = createMessageQueue(); - litePullConsumer.assign(Collections.singletonList(messageQueue)); + List messageQueues = Collections.singletonList(messageQueue); + litePullConsumer.assign(messageQueues); + litePullConsumer.pause(messageQueues); litePullConsumer.seekToBegin(messageQueue); Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); field.setAccessible(true); @@ -213,7 +220,9 @@ public class DefaultLitePullConsumerTest { when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L); when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L); MessageQueue messageQueue = createMessageQueue(); - litePullConsumer.assign(Collections.singletonList(messageQueue)); + List messageQueues = Collections.singletonList(messageQueue); + litePullConsumer.assign(messageQueues); + litePullConsumer.pause(messageQueues); litePullConsumer.seekToEnd(messageQueue); Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); field.setAccessible(true); @@ -228,7 +237,9 @@ public class DefaultLitePullConsumerTest { when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L); when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L); MessageQueue messageQueue = createMessageQueue(); - litePullConsumer.assign(Collections.singletonList(messageQueue)); + List messageQueues = Collections.singletonList(messageQueue); + litePullConsumer.assign(messageQueues); + litePullConsumer.pause(messageQueues); try { litePullConsumer.seek(messageQueue, -1); failBecauseExceptionWasNotThrown(MQClientException.class); @@ -517,9 +528,6 @@ public class DefaultLitePullConsumerTest { public void testConsumerAfterShutdown() throws Exception { DefaultLitePullConsumer defaultLitePullConsumer = createSubscribeLitePullConsumer(); - DefaultLitePullConsumer mockConsumer = spy(defaultLitePullConsumer); - when(mockConsumer.poll(anyLong())).thenReturn(new ArrayList<>()); - new AsyncConsumer().executeAsync(defaultLitePullConsumer); Thread.sleep(100); @@ -576,9 +584,9 @@ public class DefaultLitePullConsumerTest { when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) - .thenAnswer(new Answer() { + .thenAnswer(new Answer() { @Override - public Object answer(InvocationOnMock mock) throws Throwable { + public PullResult answer(InvocationOnMock mock) throws Throwable { PullMessageRequestHeader requestHeader = mock.getArgument(1); MessageClientExt messageClientExt = new MessageClientExt(); messageClientExt.setTopic(topic); @@ -604,6 +612,7 @@ public class DefaultLitePullConsumerTest { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.subscribe(topic, "*"); + suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); litePullConsumer.start(); initDefaultLitePullConsumer(litePullConsumer); return litePullConsumer; @@ -612,6 +621,7 @@ public class DefaultLitePullConsumerTest { private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); litePullConsumer.start(); initDefaultLitePullConsumer(litePullConsumer); return litePullConsumer; @@ -627,6 +637,7 @@ public class DefaultLitePullConsumerTest { litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.setMessageModel(MessageModel.BROADCASTING); litePullConsumer.subscribe(topic, "*"); + suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); litePullConsumer.start(); initDefaultLitePullConsumer(litePullConsumer); return litePullConsumer; @@ -648,4 +659,15 @@ public class DefaultLitePullConsumerTest { } return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); } + + private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException { + DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true); + if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { + litePullConsumer.changeInstanceNameToPID(); + } + MQClientInstance mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, (RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", true))); + ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); + factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory); + doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); + } } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 924ee12c84a442eb5a779de80963863773f00403..48d9b3a91a72af27ae1724b6bd88750aa72f332d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -23,9 +23,12 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; @@ -37,6 +40,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; @@ -45,13 +49,14 @@ import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; import org.apache.rocketmq.client.impl.consumer.PullMessageService; import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullResultExt; -import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl; +import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Assert; @@ -60,11 +65,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -77,9 +79,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -@PrepareForTest(DefaultMQPushConsumerImpl.class) -@PowerMockIgnore("javax.management.*") +@RunWith(MockitoJUnitRunner.class) public class DefaultMQPushConsumerTest { private String consumerGroup; private String topic = "FooBar"; @@ -89,11 +89,15 @@ public class DefaultMQPushConsumerTest { @Mock private MQClientAPIImpl mQClientAPIImpl; private PullAPIWrapper pullAPIWrapper; - private RebalancePushImpl rebalancePushImpl; + private RebalanceImpl rebalanceImpl; private DefaultMQPushConsumer pushConsumer; @Before public void init() throws Exception { + ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); + factoryTable.forEach((s, instance) -> instance.shutdown()); + factoryTable.clear(); + consumerGroup = "FooBarGroup" + System.currentTimeMillis(); pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); @@ -108,16 +112,21 @@ public class DefaultMQPushConsumerTest { }); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); - PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); - rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); + + // suppress updateTopicRouteInfoFromNameServer + pushConsumer.changeInstanceNameToPID(); + mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); + factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); + doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); + + rebalanceImpl = spy(pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl()); Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); field.setAccessible(true); - field.set(pushConsumerImpl, rebalancePushImpl); + field.set(pushConsumerImpl, rebalanceImpl); pushConsumer.subscribe(topic, "*"); pushConsumer.start(); - mQClientFactory = spy(pushConsumerImpl.getmQClientFactory()); field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); field.set(pushConsumerImpl, mQClientFactory); @@ -131,14 +140,13 @@ public class DefaultMQPushConsumerTest { field.setAccessible(true); field.set(pushConsumerImpl, pullAPIWrapper); - pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory); mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl); when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) - .thenAnswer(new Answer() { + .thenAnswer(new Answer() { @Override - public Object answer(InvocationOnMock mock) throws Throwable { + public PullResult answer(InvocationOnMock mock) throws Throwable { PullMessageRequestHeader requestHeader = mock.getArgument(1); MessageClientExt messageClientExt = new MessageClientExt(); messageClientExt.setTopic(topic); @@ -155,11 +163,10 @@ public class DefaultMQPushConsumerTest { }); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); - doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); Set messageQueueSet = new HashSet(); messageQueueSet.add(createPullRequest().getMessageQueue()); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); - doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class)); + doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class)); } @After @@ -175,12 +182,12 @@ public class DefaultMQPushConsumerTest { @Test public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final MessageExt[] messageExts = new MessageExt[1]; + final AtomicReference messageAtomic = new AtomicReference<>(); pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - messageExts[0] = msgs.get(0); + messageAtomic.set(msgs.get(0)); countDownLatch.countDown(); return null; } @@ -188,20 +195,22 @@ public class DefaultMQPushConsumerTest { PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); pullMessageService.executePullRequestImmediately(createPullRequest()); - countDownLatch.await(); - assertThat(messageExts[0].getTopic()).isEqualTo(topic); - assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'}); + countDownLatch.await(10, TimeUnit.SECONDS); + MessageExt msg = messageAtomic.get(); + assertThat(msg).isNotNull(); + assertThat(msg.getTopic()).isEqualTo(topic); + assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); } @Test public void testPullMessage_SuccessWithOrderlyService() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); - final MessageExt[] messageExts = new MessageExt[1]; + final AtomicReference messageAtomic = new AtomicReference<>(); MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { - messageExts[0] = msgs.get(0); + messageAtomic.set(msgs.get(0)); countDownLatch.countDown(); return null; } @@ -214,8 +223,10 @@ public class DefaultMQPushConsumerTest { pullMessageService.executePullRequestLater(createPullRequest(), 100); countDownLatch.await(10, TimeUnit.SECONDS); - assertThat(messageExts[0].getTopic()).isEqualTo(topic); - assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'}); + MessageExt msg = messageAtomic.get(); + assertThat(msg).isNotNull(); + assertThat(msg.getTopic()).isEqualTo(topic); + assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); } @Test @@ -281,7 +292,7 @@ public class DefaultMQPushConsumerTest { PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); pullMessageService.executePullRequestImmediately(createPullRequest()); - countDownLatch.await(); + assertThat(countDownLatch.await(30, TimeUnit.SECONDS)).isTrue(); pushConsumer.shutdown(); assertThat(messageConsumedFlag.get()).isTrue(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java index 921743c6be864401c6ba4aefb7e087cb26d4c492..e8feb80dd99f45f4713605f797130fba6e75f9e5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java @@ -16,7 +16,19 @@ */ package org.apache.rocketmq.client.impl.consumer; -import org.apache.rocketmq.client.consumer.*; +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -40,23 +52,15 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; -import java.io.ByteArrayOutputStream; -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; - import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.*; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; @@ -117,9 +121,9 @@ public class ConsumeMessageConcurrentlyServiceTest { when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) - .thenAnswer(new Answer() { + .thenAnswer(new Answer() { @Override - public Object answer(InvocationOnMock mock) throws Throwable { + public PullResult answer(InvocationOnMock mock) throws Throwable { PullMessageRequestHeader requestHeader = mock.getArgument(1); MessageClientExt messageClientExt = new MessageClientExt(); messageClientExt.setTopic(topic); @@ -145,13 +149,13 @@ public class ConsumeMessageConcurrentlyServiceTest { @Test public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); - final MessageExt[] messageExts = new MessageExt[1]; + final AtomicReference messageAtomic = new AtomicReference<>(); ConsumeMessageConcurrentlyService normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - messageExts[0] = msgs.get(0); + messageAtomic.set(msgs.get(0)); countDownLatch.countDown(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } @@ -175,8 +179,10 @@ public class ConsumeMessageConcurrentlyServiceTest { StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName()); assertThat(item.getValue().get()).isGreaterThan(0L); - assertThat(messageExts[0].getTopic()).isEqualTo(topic); - assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'}); + MessageExt msg = messageAtomic.get(); + assertThat(msg).isNotNull(); + assertThat(msg.getTopic()).isEqualTo(topic); + assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); } @After diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index d8f1e18d28841427a565e441d2d0690e7c91a29d..d37844ca9631ed801779571fd193bcee6135a20b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -39,7 +40,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.internal.util.reflection.FieldSetter; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -55,7 +55,7 @@ public class MQClientInstanceTest { @Before public void init() throws Exception { - FieldSetter.setField(mqClientInstance, MQClientInstance.class.getDeclaredField("brokerAddrTable"), brokerAddrTable); + FieldUtils.writeDeclaredField(mqClientInstance, "brokerAddrTable", brokerAddrTable, true); } @Test diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java index 16a3d0275b314b4b3cd833901fafb7a34c03a6ef..c173b8ef790b1711e906fad8a2c8f3ff26e7b70f 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java @@ -20,6 +20,18 @@ package org.apache.rocketmq.client.trace; import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import io.opentracing.tag.Tags; +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -27,11 +39,14 @@ import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; @@ -47,28 +62,17 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.io.ByteArrayOutputStream; -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -80,9 +84,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -@PrepareForTest(DefaultMQPushConsumerImpl.class) -@PowerMockIgnore("javax.management.*") +@RunWith(MockitoJUnitRunner.class) public class DefaultMQConsumerWithOpenTracingTest { private String consumerGroup; @@ -99,6 +101,10 @@ public class DefaultMQConsumerWithOpenTracingTest { @Before public void init() throws Exception { + ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); + factoryTable.forEach((s, instance) -> instance.shutdown()); + factoryTable.clear(); + consumerGroup = "FooBarGroup" + System.currentTimeMillis(); pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( @@ -106,6 +112,10 @@ public class DefaultMQConsumerWithOpenTracingTest { pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); + OffsetStore offsetStore = Mockito.mock(OffsetStore.class); + Mockito.when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(0L); + pushConsumer.setOffsetStore(offsetStore); + pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, @@ -114,8 +124,14 @@ public class DefaultMQConsumerWithOpenTracingTest { } }); - PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); + + // suppress updateTopicRouteInfoFromNameServer + pushConsumer.changeInstanceNameToPID(); + mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); + factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); + doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); + rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); field.setAccessible(true); @@ -124,8 +140,6 @@ public class DefaultMQConsumerWithOpenTracingTest { pushConsumer.start(); - mQClientFactory = spy(pushConsumerImpl.getmQClientFactory()); - field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); field.set(pushConsumerImpl, mQClientFactory); @@ -142,11 +156,11 @@ public class DefaultMQConsumerWithOpenTracingTest { pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory); mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl); - when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), + when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) - .thenAnswer(new Answer() { + .thenAnswer(new Answer() { @Override - public Object answer(InvocationOnMock mock) throws Throwable { + public PullResult answer(InvocationOnMock mock) throws Throwable { PullMessageRequestHeader requestHeader = mock.getArgument(1); MessageClientExt messageClientExt = new MessageClientExt(); messageClientExt.setTopic(topic); @@ -176,12 +190,12 @@ public class DefaultMQConsumerWithOpenTracingTest { @Test public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final MessageExt[] messageExts = new MessageExt[1]; + final AtomicReference messageAtomic = new AtomicReference<>(); pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - messageExts[0] = msgs.get(0); + messageAtomic.set(msgs.get(0)); countDownLatch.countDown(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } @@ -189,9 +203,11 @@ public class DefaultMQConsumerWithOpenTracingTest { PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); pullMessageService.executePullRequestImmediately(createPullRequest()); - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); - assertThat(messageExts[0].getTopic()).isEqualTo(topic); - assertThat(messageExts[0].getBody()).isEqualTo(new byte[]{'a'}); + countDownLatch.await(30, TimeUnit.SECONDS); + MessageExt msg = messageAtomic.get(); + assertThat(msg).isNotNull(); + assertThat(msg.getTopic()).isEqualTo(topic); + assertThat(msg.getBody()).isEqualTo(new byte[]{'a'}); assertThat(tracer.finishedSpans().size()).isEqualTo(1); MockSpan span = tracer.finishedSpans().get(0); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index c54c0bb7a4fa284904caca14df812b551ffc9986..aec7d2cb0e28e57884bb468b41d831ac23ece99b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -26,8 +26,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -40,6 +43,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; @@ -53,17 +57,16 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; - import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Before; @@ -71,11 +74,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -87,9 +87,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -@PrepareForTest(DefaultMQPushConsumerImpl.class) -@PowerMockIgnore("javax.management.*") +@RunWith(MockitoJUnitRunner.class) public class DefaultMQConsumerWithTraceTest { private String consumerGroup; private String consumerGroupNormal; @@ -116,6 +114,10 @@ public class DefaultMQConsumerWithTraceTest { @Before public void init() throws Exception { + ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); + factoryTable.forEach((s, instance) -> instance.shutdown()); + factoryTable.clear(); + consumerGroup = "FooBarGroup" + System.currentTimeMillis(); pushConsumer = new DefaultMQPushConsumer(consumerGroup, true, ""); consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis(); @@ -135,8 +137,14 @@ public class DefaultMQConsumerWithTraceTest { } }); - PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); + + // suppress updateTopicRouteInfoFromNameServer + pushConsumer.changeInstanceNameToPID(); + mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); + factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); + doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); + rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); field.setAccessible(true); @@ -174,9 +182,9 @@ public class DefaultMQConsumerWithTraceTest { when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) - .thenAnswer(new Answer() { + .thenAnswer(new Answer() { @Override - public Object answer(InvocationOnMock mock) throws Throwable { + public PullResult answer(InvocationOnMock mock) throws Throwable { PullMessageRequestHeader requestHeader = mock.getArgument(1); MessageClientExt messageClientExt = new MessageClientExt(); messageClientExt.setTopic(topic); @@ -208,12 +216,12 @@ public class DefaultMQConsumerWithTraceTest { traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); final CountDownLatch countDownLatch = new CountDownLatch(1); - final MessageExt[] messageExts = new MessageExt[1]; + final AtomicReference messageAtomic = new AtomicReference<>(); pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - messageExts[0] = msgs.get(0); + messageAtomic.set(msgs.get(0)); countDownLatch.countDown(); return null; } @@ -221,9 +229,11 @@ public class DefaultMQConsumerWithTraceTest { PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); pullMessageService.executePullRequestImmediately(createPullRequest()); - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); - assertThat(messageExts[0].getTopic()).isEqualTo(topic); - assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'}); + countDownLatch.await(30, TimeUnit.SECONDS); + MessageExt msg = messageAtomic.get(); + assertThat(msg).isNotNull(); + assertThat(msg.getTopic()).isEqualTo(topic); + assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); } private PullRequest createPullRequest() { diff --git a/client/src/test/resources/log4j2.xml b/client/src/test/resources/log4j2.xml new file mode 100644 index 0000000000000000000000000000000000000000..52cf2a88697c905acf266f6a65becd18e1ee359c --- /dev/null +++ b/client/src/test/resources/log4j2.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java index 428640a383b6149093965155ab5638515794acbc..514f3ceb7fe6fe0f3491de05f64485072118df37 100644 --- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java @@ -57,7 +57,7 @@ public class OpenTracingTransactionProducer { try { Message msg = new Message("TopicTest", "Tag", "KEY", - ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); + "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); } catch (MQClientException | UnsupportedEncodingException e) { diff --git a/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java b/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java index b6d10497782d9714b989e682ea1a2ea2ccc88a7c..aaba4d6e89d01b64abfcbbe6d33551fc82427645 100755 --- a/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java +++ b/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java @@ -33,13 +33,13 @@ public class SysLogger { public static void debug(String msg) { if (debugEnabled && !quietMode) { - System.out.printf("%s", PREFIX + msg); + System.err.println(PREFIX + msg); } } public static void debug(String msg, Throwable t) { if (debugEnabled && !quietMode) { - System.out.printf("%s", PREFIX + msg); + System.err.println(PREFIX + msg); if (t != null) { t.printStackTrace(System.out); } diff --git a/pom.xml b/pom.xml index 05878e5c83cae7629f48dce665f4c57a2ffb054f..3714eb344ee21b5c180be979e46b7c11fe83a49c 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,6 @@ ${project.basedir}/../test/target/jacoco-it.exec file:**/generated-sources/**,**/test/** - 2.0.2 @@ -425,7 +424,7 @@ junit junit - 4.11 + 4.13.2 test @@ -437,19 +436,7 @@ org.mockito mockito-core - 2.23.0 - test - - - org.powermock - powermock-module-junit4 - ${powermock.version} - test - - - org.powermock - powermock-api-mockito2 - ${powermock.version} + 3.10.0 test