未验证 提交 e6d0afbf 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #746 from zongtanghu/snode

[ISSUE #743]Fix code style issue and adjust some unit test in client module for the client.
...@@ -160,6 +160,12 @@ public class LocalFileOffsetStore implements OffsetStore { ...@@ -160,6 +160,12 @@ public class LocalFileOffsetStore implements OffsetStore {
} }
@Override
public void updateConsumeOffsetToSnode(final MessageQueue mq, final long offset, final boolean isOneway)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
}
@Override @Override
public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway) public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......
...@@ -71,4 +71,12 @@ public interface OffsetStore { ...@@ -71,4 +71,12 @@ public interface OffsetStore {
*/ */
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException; MQBrokerException, InterruptedException, MQClientException;
/**
* @param mq
* @param offset
* @param isOneway
*/
void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
} }
...@@ -88,7 +88,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -88,7 +88,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
} }
case READ_FROM_STORE: { case READ_FROM_STORE: {
try { try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); long brokerOffset = this.fetchConsumeOffsetFromSnode(mq);
AtomicLong offset = new AtomicLong(brokerOffset); AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false); this.updateOffset(mq, offset.get(), false);
return brokerOffset; return brokerOffset;
...@@ -195,20 +195,20 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -195,20 +195,20 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
} }
private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException, private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true); updateConsumeOffsetToSnode(mq, offset, true);
} }
/** /**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
*/ */
@Override @Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr){ if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer(); this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
} }
if (snodeAddr != null) { if (snodeAddr != null) {
...@@ -226,16 +226,71 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -226,16 +226,71 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
snodeAddr, requestHeader, 1000 * 5); snodeAddr, requestHeader, 1000 * 5);
} }
} else { } else {
throw new MQClientException("Update offset to Broker[" + mq.getBrokerName() + "] failed, Snode is null.", null); throw new MQClientException("Update offset to Snode[" + mq.getBrokerName() + "] failed, Snode is null.", null);
}
}
/**
* Preserved firstly,Compatible with RocketMQ 4.X Version
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset,
boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
requestHeader.setEnodeName(mq.getBrokerName());
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
private long fetchConsumeOffsetFromSnode(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
}
if (snodeAddr != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setEnodeName(mq.getBrokerName());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
snodeAddr, requestHeader, 1000 * 5);
} else {
throw new MQClientException("Get Offset from Snode[" + mq.getBrokerName() + "] failed, Snode is not exist", null);
} }
} }
/**
* Preserved firstly,Compatible with RocketMQ 4.X Version
*/
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException { InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr){ if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer(); this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
} }
if (snodeAddr != null) { if (snodeAddr != null) {
......
...@@ -16,9 +16,6 @@ ...@@ -16,9 +16,6 @@
*/ */
package org.apache.rocketmq.client.exception; package org.apache.rocketmq.client.exception;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
public class MQSnodeException extends MQBrokerException { public class MQSnodeException extends MQBrokerException {
public MQSnodeException(int responseCode, String errorMessage) { public MQSnodeException(int responseCode, String errorMessage) {
......
...@@ -1194,7 +1194,7 @@ public class MQClientAPIImpl { ...@@ -1194,7 +1194,7 @@ public class MQClientAPIImpl {
public SnodeClusterInfo getSnodeClusterInfo( public SnodeClusterInfo getSnodeClusterInfo(
//Todo Redifine snode exception //Todo Redifine snode exception
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException , MQBrokerException{ RemotingSendRequestException, RemotingConnectException , MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
......
...@@ -480,9 +480,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -480,9 +480,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup()); sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup());
} }
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); this.offsetStore.updateConsumeOffsetToSnode(mq, offset, isOneway);
} }
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
......
...@@ -73,7 +73,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; ...@@ -73,7 +73,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultMQPushConsumerImpl implements MQConsumerInner { public class DefaultMQPushConsumerImpl implements MQConsumerInner {
......
...@@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock; ...@@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor; import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.FindBrokerResult;
...@@ -80,7 +79,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; ...@@ -80,7 +79,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
...@@ -77,6 +77,8 @@ public class DefaultMQPullConsumerTest { ...@@ -77,6 +77,8 @@ public class DefaultMQPullConsumerTest {
field.set(mQClientFactory, mQClientAPIImpl); field.set(mQClientFactory, mQClientAPIImpl);
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911");
} }
@After @After
......
...@@ -147,6 +147,8 @@ public class DefaultMQPushConsumerTest { ...@@ -147,6 +147,8 @@ public class DefaultMQPushConsumerTest {
}); });
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
doReturn("127.0.0.1:10911").when(mQClientFactory).findSnodeAddressInPublish();
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue()); messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
......
...@@ -20,7 +20,6 @@ import java.util.Collections; ...@@ -20,7 +20,6 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
...@@ -34,7 +33,6 @@ import org.mockito.Mock; ...@@ -34,7 +33,6 @@ import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
...@@ -58,8 +56,8 @@ public class RemoteBrokerOffsetStoreTest { ...@@ -58,8 +56,8 @@ public class RemoteBrokerOffsetStoreTest {
System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets"); System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis(); String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
when(mQClientFactory.getClientId()).thenReturn(clientId); when(mQClientFactory.getClientId()).thenReturn(clientId);
when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI); when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911");
} }
@Test @Test
......
...@@ -106,6 +106,8 @@ public class DefaultMQProducerTest { ...@@ -106,6 +106,8 @@ public class DefaultMQProducerTest {
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK)); .thenReturn(createSendResult(SendStatus.SEND_OK));
when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911");
} }
@After @After
......
...@@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService { ...@@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService {
} }
@Override @Override
public void registerSnode(SnodeConfig snodeConfig) throws Exception{ public void registerSnode(SnodeConfig snodeConfig) throws Exception {
List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList(); List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
RemotingCommand remotingCommand = new RemotingCommand(); RemotingCommand remotingCommand = new RemotingCommand();
RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader(); RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册