diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index b758ac70609ff3b7b2da8abc34468c6fa9a479de..2b3793d4640c38238541eaa1b582776801cd77bf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -160,6 +160,12 @@ public class LocalFileOffsetStore implements OffsetStore { } + @Override + public void updateConsumeOffsetToSnode(final MessageQueue mq, final long offset, final boolean isOneway) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + + } + @Override public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java index 9deed0e3dfe8b400ef4aafa77523bcd70ffee833..16ab829f9d850bd5cb3880980bc01d7b45c00432 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java @@ -71,4 +71,12 @@ public interface OffsetStore { */ void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + /** + * @param mq + * @param offset + * @param isOneway + */ + void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 7752393549893f4303e436c6a87aa6bcab82ddfa..8b78c8f9515168eaa64fd25d85aecc3e23b5b188 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -88,7 +88,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } case READ_FROM_STORE: { try { - long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); + long brokerOffset = this.fetchConsumeOffsetFromSnode(mq); AtomicLong offset = new AtomicLong(brokerOffset); this.updateOffset(mq, offset.get(), false); return brokerOffset; @@ -195,20 +195,20 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - updateConsumeOffsetToBroker(mq, offset, true); + updateConsumeOffsetToSnode(mq, offset, true); } /** * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. */ @Override - public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); - if (null == snodeAddr){ + if (null == snodeAddr) { this.mQClientFactory.updateSnodeInfoFromNameServer(); - snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); } if (snodeAddr != null) { @@ -226,16 +226,71 @@ public class RemoteBrokerOffsetStore implements OffsetStore { snodeAddr, requestHeader, 1000 * 5); } } else { - throw new MQClientException("Update offset to Broker[" + mq.getBrokerName() + "] failed, Snode is null.", null); + throw new MQClientException("Update offset to Snode[" + mq.getBrokerName() + "] failed, Snode is null.", null); + } + } + + /** + * Preserved firstly,Compatible with RocketMQ 4.X Version + */ + @Override + public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, + boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + if (null == findBrokerResult) { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); + findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + } + + if (findBrokerResult != null) { + UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setConsumerGroup(this.groupName); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setCommitOffset(offset); + requestHeader.setEnodeName(mq.getBrokerName()); + if (isOneway) { + this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + } else { + this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + } + } else { + throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); + } + } + + private long fetchConsumeOffsetFromSnode(MessageQueue mq) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + if (null == snodeAddr) { + this.mQClientFactory.updateSnodeInfoFromNameServer(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + } + + if (snodeAddr != null) { + QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader(); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setConsumerGroup(this.groupName); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setEnodeName(mq.getBrokerName()); + return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( + snodeAddr, requestHeader, 1000 * 5); + } else { + throw new MQClientException("Get Offset from Snode[" + mq.getBrokerName() + "] failed, Snode is not exist", null); } } + /** + * Preserved firstly,Compatible with RocketMQ 4.X Version + */ private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); - if (null == snodeAddr){ + if (null == snodeAddr) { this.mQClientFactory.updateSnodeInfoFromNameServer(); - snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); } if (snodeAddr != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java index 440e5779a43121db92b64820d00b2be02b8ceb61..673293a3cc2170672690008e03a521272949a16f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java +++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java @@ -16,9 +16,6 @@ */ package org.apache.rocketmq.client.exception; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.help.FAQUrl; - public class MQSnodeException extends MQBrokerException { public MQSnodeException(int responseCode, String errorMessage) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 0080f1ee534ac5ecce2c04ca230c89124b6ff598..3fdc05d03fd3f52ef72a39ed0e0b039470b03e3a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1194,7 +1194,7 @@ public class MQClientAPIImpl { public SnodeClusterInfo getSnodeClusterInfo( //Todo Redifine snode exception final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException , MQBrokerException{ + RemotingSendRequestException, RemotingConnectException , MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 20a72d8e832db88db5ab8b1574532f6f55901e32..829158be0cbaadfce35ced6519039e1e315264f8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -480,9 +480,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup()); } - public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); + this.offsetStore.updateConsumeOffsetToSnode(mq, offset, isOneway); } public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index ed67077830ea9d6e2cd9681e882d612b925bbdca..23aee312bdfeb055748f56cac0b2fa8abfee820a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -73,7 +73,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; public class DefaultMQPushConsumerImpl implements MQConsumerInner { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f881d981878463ab13d5c333e729866c0a19169b..6c2be58ea4e656ea3a01b2907b6e215384e66d18 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.common.ThreadLocalIndex; -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.ClientRemotingProcessor; import org.apache.rocketmq.client.impl.FindBrokerResult; @@ -80,7 +79,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.protocol.RemotingCommand; diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java index 569055dea2566068b0ca165d893b710dcd08b751..f060ad8e446bfe97a2e568392cd6442727c12f9b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java @@ -77,6 +77,8 @@ public class DefaultMQPullConsumerTest { field.set(mQClientFactory, mQClientAPIImpl); when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); + when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911"); + } @After diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 7224794982bc459dbf68a384d399a958682355fc..90de649396c31d4efb2bc851bc035b6fc306e5c4 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -147,6 +147,8 @@ public class DefaultMQPushConsumerTest { }); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); + doReturn("127.0.0.1:10911").when(mQClientFactory).findSnodeAddressInPublish(); + Set messageQueueSet = new HashSet(); messageQueueSet.add(createPullRequest().getMessageQueue()); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java index 64d64f29a9489fbc77577c71170e552718be3263..d8bf38201a1545a657541d13507829ac446ffd3d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java @@ -20,7 +20,6 @@ import java.util.Collections; import java.util.HashSet; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageQueue; @@ -34,7 +33,6 @@ import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -58,8 +56,8 @@ public class RemoteBrokerOffsetStoreTest { System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets"); String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis(); when(mQClientFactory.getClientId()).thenReturn(clientId); - when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false)); when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI); + when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911"); } @Test diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index e2793f565e6cf1974519624ec8ea7fcacebb9f99..210c05ea3c003d7025c5d8c55df2b0ec437b8f4b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -106,6 +106,8 @@ public class DefaultMQProducerTest { when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK)); + when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911"); + } @After diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java index d593613b16632edea9229f3156fed956e8d0c4d6..8dcdf0c61dba53ebe9fd71455dd7679c2cda9098 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java @@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService { } @Override - public void registerSnode(SnodeConfig snodeConfig) throws Exception{ + public void registerSnode(SnodeConfig snodeConfig) throws Exception { List nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList(); RemotingCommand remotingCommand = new RemotingCommand(); RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();