diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 4244bddcf2f2a1edf3b196c5fb9026180bf6d123..ae9ed6c3ab341da4018042da13e7485d8cf5fd60 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.Iterator; -import java.util.Collections; -import java.util.ArrayList; -import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.PullCallback; @@ -49,12 +49,12 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -152,7 +152,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.slf4j.Logger; - public class MQClientAPIImpl { private final static Logger log = ClientLogger.getLog(); @@ -169,7 +168,8 @@ public class MQClientAPIImpl { private String nameSrvAddr = null; private ClientConfig clientConfig; - public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, + public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, + final ClientRemotingProcessor clientRemotingProcessor, RPCHook rpcHook, final ClientConfig clientConfig) { this.clientConfig = clientConfig; topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName()); @@ -233,7 +233,8 @@ public class MQClientAPIImpl { this.remotingClient.shutdown(); } - public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis) + public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, + final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); @@ -255,7 +256,8 @@ public class MQClientAPIImpl { } - public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) + public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, + final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); @@ -284,14 +286,14 @@ public class MQClientAPIImpl { } public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendMessageContext context, // 7 - final DefaultMQProducerImpl producer // 8 + final String addr, // 1 + final String brokerName, // 2 + final Message msg, // 3 + final SendMessageRequestHeader requestHeader, // 4 + final long timeoutMillis, // 5 + final CommunicationMode communicationMode, // 6 + final SendMessageContext context, // 7 + final DefaultMQProducerImpl producer // 8 ) throws RemotingException, MQBrokerException, InterruptedException { return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); } @@ -340,11 +342,11 @@ public class MQClientAPIImpl { } private SendResult sendMessageSync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request// + final String addr, // + final String brokerName, // + final Message msg, // + final long timeoutMillis, // + final RemotingCommand request// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; @@ -619,7 +621,8 @@ public class MQClientAPIImpl { return this.processPullResponse(response); } - private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException { + private PullResult processPullResponse( + final RemotingCommand response) throws MQBrokerException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; switch (response.getCode()) { case ResponseCode.SUCCESS: @@ -668,7 +671,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis) + public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, + final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); requestHeader.setTopic(topic); @@ -767,7 +771,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis) + public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, + final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader(); requestHeader.setTopic(topic); @@ -1009,7 +1014,8 @@ public class MQClientAPIImpl { } } - public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException, + public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, + final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader(); requestHeader.setTopic(topic); @@ -1036,7 +1042,8 @@ public class MQClientAPIImpl { return getConsumeStats(addr, consumerGroup, null, timeoutMillis); } - public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis) + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, + final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader(); @@ -1059,7 +1066,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis) + public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, + final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader(); @@ -1080,7 +1088,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis) + public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, + final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader(); @@ -1160,7 +1169,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, + public ClusterInfo getBrokerClusterInfo( + final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null); @@ -1179,33 +1189,18 @@ public class MQClientAPIImpl { public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { - GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); - requestHeader.setTopic(topic); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.TOPIC_NOT_EXIST: { - // TODO LOG - break; - } - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - return TopicRouteData.decode(body, TopicRouteData.class); - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); + return getTopicRouteInfoFromNameServer(topic, timeoutMillis, false); } public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + + return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true); + } + + public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, + boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); @@ -1215,8 +1210,10 @@ public class MQClientAPIImpl { assert response != null; switch (response.getCode()) { case ResponseCode.TOPIC_NOT_EXIST: { - if (!topic.equals(MixAll.DEFAULT_TOPIC)) + if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) { log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic); + } + // TODO :- Log when if condition is not satisfied break; } case ResponseCode.SUCCESS: { @@ -1252,7 +1249,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException, + public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, + final long timeoutMillis) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader(); requestHeader.setBrokerName(brokerName); @@ -1471,8 +1469,10 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public Map> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group, - final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + public Map> invokeBrokerToGetConsumerStatus(final String addr, final String topic, + final String group, + final String clientAddr, + final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); @@ -1519,7 +1519,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public List queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis) + public List queryConsumeTimeSpan(final String addr, final String topic, final String group, + final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader(); @@ -1593,7 +1594,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + public TopicList getSystemTopicList( + final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); @@ -1642,7 +1644,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, + public boolean cleanExpiredConsumeQueue(final String addr, + long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -1658,7 +1661,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, + public boolean cleanUnusedTopicByAddr(final String addr, + long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -1674,7 +1678,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack, + public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, + boolean jstack, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); @@ -1731,7 +1736,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public Map queryCorrectionOffset(final String addr, final String topic, final String group, Set filterGroup, + public Map queryCorrectionOffset(final String addr, final String topic, final String group, + Set filterGroup, long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader(); @@ -1854,7 +1860,8 @@ public class MQClientAPIImpl { } public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic, - final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + final boolean isOffline, + final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader(); requestHeader.setSrcGroup(srcGroup); requestHeader.setDestGroup(destGroup); @@ -1902,13 +1909,15 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public Set getClusterList(String topic, long timeoutMillis) throws MQClientException, RemotingConnectException, + public Set getClusterList(String topic, + long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // todo:jodie return Collections.EMPTY_SET; } - public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, long timeoutMillis) throws MQClientException, + public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, + long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { GetConsumeStatsInBrokerHeader requestHeader = new GetConsumeStatsInBrokerHeader(); requestHeader.setIsOrder(isOrder); @@ -1932,7 +1941,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, + public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand response = this.remotingClient @@ -1948,7 +1958,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public TopicConfigSerializeWrapper getAllTopicConfig(final String addr, long timeoutMillis) throws RemotingConnectException, + public TopicConfigSerializeWrapper getAllTopicConfig(final String addr, + long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); @@ -2028,9 +2039,10 @@ public class MQClientAPIImpl { return configMap; } - public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, final int queueId, - final long index, final int count, final String consumerGroup, - final long timeoutMillis) throws InterruptedException, + public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, + final int queueId, + final long index, final int count, final String consumerGroup, + final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader(); @@ -2054,8 +2066,8 @@ public class MQClientAPIImpl { } public void checkClientInBroker(final String brokerAddr, final String consumerGroup, - final String clientId, final SubscriptionData subscriptionData, - final long timeoutMillis) + final String clientId, final SubscriptionData subscriptionData, + final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 7b5ac45eddfac9a6e136ac58d9e6a43f6266825a..49a1ebaea3d9282f857075cda77ecb62ddbe85f4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -413,8 +413,9 @@ public class DefaultMessageStore implements MessageStore { return commitLog; } - public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, - final MessageFilter messageFilter) { + public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, + final int maxMsgNums, + final MessageFilter messageFilter) { if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden"); return null; @@ -717,22 +718,27 @@ public class DefaultMessageStore implements MessageStore { long minLogicOffset = logicQueue.getMinLogicOffset(); SelectMappedBufferResult result = logicQueue.getIndexBuffer(minLogicOffset / ConsumeQueue.CQ_STORE_UNIT_SIZE); - if (result != null) { - try { - final long phyOffset = result.getByteBuffer().getLong(); - final int size = result.getByteBuffer().getInt(); - long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); - return storeTime; - } catch (Exception e) { - } finally { - result.release(); - } - } + return getStoreTime(result); } return -1; } + private long getStoreTime(SelectMappedBufferResult result) { + if (result != null) { + try { + final long phyOffset = result.getByteBuffer().getLong(); + final int size = result.getByteBuffer().getInt(); + long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); + return storeTime; + } catch (Exception e) { + } finally { + result.release(); + } + } + return -1; + } + @Override public long getEarliestMessageTime() { final long minPhyOffset = this.getMinPhyOffset(); @@ -745,17 +751,7 @@ public class DefaultMessageStore implements MessageStore { ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); if (logicQueue != null) { SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset); - if (result != null) { - try { - final long phyOffset = result.getByteBuffer().getLong(); - final int size = result.getByteBuffer().getInt(); - long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); - return storeTime; - } catch (Exception ignored) { - } finally { - result.release(); - } - } + return getStoreTime(result); } return -1; @@ -955,7 +951,8 @@ public class DefaultMessageStore implements MessageStore { } } - public Map getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, SocketAddress storeHost) { + public Map getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, + SocketAddress storeHost) { Map messageIds = new HashMap(); if (this.shutdown) { return messageIds; diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index ac78a1d304e095e65db19456b52f4714506a389f..a81f3285e6ddf6d6f737e0a36ba4aa7abfea51a0 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -77,20 +77,7 @@ public class DefaultMessageStoreTest { assertTrue(load); master.start(); - try { - for (long i = 0; i < totalMsgs; i++) { - master.putMessage(buildMessage()); - } - - for (long i = 0; i < totalMsgs; i++) { - GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); - assertThat(result).isNotNull(); - result.release(); - } - } finally { - master.shutdown(); - master.destroy(); - } + verifyThatMasterIsFunctional(totalMsgs, master); } public MessageExtBrokerInner buildMessage() { @@ -121,6 +108,10 @@ public class DefaultMessageStoreTest { assertTrue(load); master.start(); + verifyThatMasterIsFunctional(totalMsgs, master); + } + + private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) { try { for (long i = 0; i < totalMsgs; i++) { master.putMessage(buildMessage()); @@ -171,7 +162,7 @@ public class DefaultMessageStoreTest { private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, - byte[] filterBitMap, Map properties) { + byte[] filterBitMap, Map properties) { } } }