提交 ffad6566 编写于 作者: R Ritabrata Moitra 提交者: yukon

[ROCKETMQ-209]Remove duplicated code in class MQClientAPIImpl

Author: Ritabrata Moitra <rmoitra@thoughtworks.com>

Closes #134 from Ritabrata-TW/master.
上级 ccc2235a
......@@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Iterator;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
......@@ -49,12 +49,12 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -152,7 +152,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
public class MQClientAPIImpl {
private final static Logger log = ClientLogger.getLog();
......@@ -169,7 +168,8 @@ public class MQClientAPIImpl {
private String nameSrvAddr = null;
private ClientConfig clientConfig;
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
......@@ -233,7 +233,8 @@ public class MQClientAPIImpl {
this.remotingClient.shutdown();
}
public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis)
public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
......@@ -255,7 +256,8 @@ public class MQClientAPIImpl {
}
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
......@@ -619,7 +621,8 @@ public class MQClientAPIImpl {
return this.processPullResponse(response);
}
private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
private PullResult processPullResponse(
final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
switch (response.getCode()) {
case ResponseCode.SUCCESS:
......@@ -668,7 +671,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis)
public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
requestHeader.setTopic(topic);
......@@ -767,7 +771,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis)
public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
requestHeader.setTopic(topic);
......@@ -1009,7 +1014,8 @@ public class MQClientAPIImpl {
}
}
public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException,
public TopicStatsTable getTopicStatsInfo(final String addr, final String topic,
final long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader();
requestHeader.setTopic(topic);
......@@ -1036,7 +1042,8 @@ public class MQClientAPIImpl {
return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
}
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
......@@ -1059,7 +1066,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis)
public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup,
final long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException {
GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
......@@ -1080,7 +1088,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis)
public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup,
final long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException {
GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
......@@ -1160,7 +1169,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
public ClusterInfo getBrokerClusterInfo(
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
......@@ -1179,33 +1189,18 @@ public class MQClientAPIImpl {
public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
throws RemotingException, MQClientException, InterruptedException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
// TODO LOG
break;
}
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
return getTopicRouteInfoFromNameServer(topic, timeoutMillis, false);
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
throws RemotingException, MQClientException, InterruptedException {
return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
......@@ -1215,8 +1210,10 @@ public class MQClientAPIImpl {
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (!topic.equals(MixAll.DEFAULT_TOPIC))
if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
// TODO :- Log when if condition is not satisfied
break;
}
case ResponseCode.SUCCESS: {
......@@ -1252,7 +1249,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException,
public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName,
final long timeoutMillis) throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
requestHeader.setBrokerName(brokerName);
......@@ -1471,8 +1469,10 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic,
final String group,
final String clientAddr,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
......@@ -1519,7 +1519,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis)
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
final long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException {
QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
......@@ -1593,7 +1594,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
public TopicList getSystemTopicList(
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
......@@ -1642,7 +1644,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
public boolean cleanExpiredConsumeQueue(final String addr,
long timeoutMillis) throws MQClientException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
......@@ -1658,7 +1661,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
public boolean cleanUnusedTopicByAddr(final String addr,
long timeoutMillis) throws MQClientException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
......@@ -1674,7 +1678,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack,
public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId,
boolean jstack,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
......@@ -1731,7 +1736,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup,
public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group,
Set<String> filterGroup,
long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader();
......@@ -1854,7 +1860,8 @@ public class MQClientAPIImpl {
}
public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic,
final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
final boolean isOffline,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader();
requestHeader.setSrcGroup(srcGroup);
requestHeader.setDestGroup(destGroup);
......@@ -1902,13 +1909,15 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public Set<String> getClusterList(String topic, long timeoutMillis) throws MQClientException, RemotingConnectException,
public Set<String> getClusterList(String topic,
long timeoutMillis) throws MQClientException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
// todo:jodie
return Collections.EMPTY_SET;
}
public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, long timeoutMillis) throws MQClientException,
public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder,
long timeoutMillis) throws MQClientException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
GetConsumeStatsInBrokerHeader requestHeader = new GetConsumeStatsInBrokerHeader();
requestHeader.setIsOrder(isOrder);
......@@ -1932,7 +1941,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException,
public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = this.remotingClient
......@@ -1948,7 +1958,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public TopicConfigSerializeWrapper getAllTopicConfig(final String addr, long timeoutMillis) throws RemotingConnectException,
public TopicConfigSerializeWrapper getAllTopicConfig(final String addr,
long timeoutMillis) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
......@@ -2028,7 +2039,8 @@ public class MQClientAPIImpl {
return configMap;
}
public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, final int queueId,
public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic,
final int queueId,
final long index, final int count, final String consumerGroup,
final long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
......
......@@ -413,7 +413,8 @@ public class DefaultMessageStore implements MessageStore {
return commitLog;
}
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
......@@ -717,6 +718,13 @@ public class DefaultMessageStore implements MessageStore {
long minLogicOffset = logicQueue.getMinLogicOffset();
SelectMappedBufferResult result = logicQueue.getIndexBuffer(minLogicOffset / ConsumeQueue.CQ_STORE_UNIT_SIZE);
return getStoreTime(result);
}
return -1;
}
private long getStoreTime(SelectMappedBufferResult result) {
if (result != null) {
try {
final long phyOffset = result.getByteBuffer().getLong();
......@@ -728,8 +736,6 @@ public class DefaultMessageStore implements MessageStore {
result.release();
}
}
}
return -1;
}
......@@ -745,17 +751,7 @@ public class DefaultMessageStore implements MessageStore {
ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
if (logicQueue != null) {
SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);
if (result != null) {
try {
final long phyOffset = result.getByteBuffer().getLong();
final int size = result.getByteBuffer().getInt();
long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
return storeTime;
} catch (Exception ignored) {
} finally {
result.release();
}
}
return getStoreTime(result);
}
return -1;
......@@ -955,7 +951,8 @@ public class DefaultMessageStore implements MessageStore {
}
}
public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, SocketAddress storeHost) {
public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset,
SocketAddress storeHost) {
Map<String, Long> messageIds = new HashMap<String, Long>();
if (this.shutdown) {
return messageIds;
......
......@@ -77,20 +77,7 @@ public class DefaultMessageStoreTest {
assertTrue(load);
master.start();
try {
for (long i = 0; i < totalMsgs; i++) {
master.putMessage(buildMessage());
}
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
result.release();
}
} finally {
master.shutdown();
master.destroy();
}
verifyThatMasterIsFunctional(totalMsgs, master);
}
public MessageExtBrokerInner buildMessage() {
......@@ -121,6 +108,10 @@ public class DefaultMessageStoreTest {
assertTrue(load);
master.start();
verifyThatMasterIsFunctional(totalMsgs, master);
}
private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) {
try {
for (long i = 0; i < totalMsgs; i++) {
master.putMessage(buildMessage());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册