diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index b758ac70609ff3b7b2da8abc34468c6fa9a479de..2b3793d4640c38238541eaa1b582776801cd77bf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -160,6 +160,12 @@ public class LocalFileOffsetStore implements OffsetStore { } + @Override + public void updateConsumeOffsetToSnode(final MessageQueue mq, final long offset, final boolean isOneway) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + + } + @Override public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java index 9deed0e3dfe8b400ef4aafa77523bcd70ffee833..16ab829f9d850bd5cb3880980bc01d7b45c00432 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java @@ -71,4 +71,12 @@ public interface OffsetStore { */ void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + /** + * @param mq + * @param offset + * @param isOneway + */ + void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 7752393549893f4303e436c6a87aa6bcab82ddfa..8b78c8f9515168eaa64fd25d85aecc3e23b5b188 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -88,7 +88,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } case READ_FROM_STORE: { try { - long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); + long brokerOffset = this.fetchConsumeOffsetFromSnode(mq); AtomicLong offset = new AtomicLong(brokerOffset); this.updateOffset(mq, offset.get(), false); return brokerOffset; @@ -195,20 +195,20 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - updateConsumeOffsetToBroker(mq, offset, true); + updateConsumeOffsetToSnode(mq, offset, true); } /** * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. */ @Override - public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); - if (null == snodeAddr){ + if (null == snodeAddr) { this.mQClientFactory.updateSnodeInfoFromNameServer(); - snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); } if (snodeAddr != null) { @@ -226,16 +226,71 @@ public class RemoteBrokerOffsetStore implements OffsetStore { snodeAddr, requestHeader, 1000 * 5); } } else { - throw new MQClientException("Update offset to Broker[" + mq.getBrokerName() + "] failed, Snode is null.", null); + throw new MQClientException("Update offset to Snode[" + mq.getBrokerName() + "] failed, Snode is null.", null); + } + } + + /** + * Preserved firstly,Compatible with RocketMQ 4.X Version + */ + @Override + public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, + boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + if (null == findBrokerResult) { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); + findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + } + + if (findBrokerResult != null) { + UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setConsumerGroup(this.groupName); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setCommitOffset(offset); + requestHeader.setEnodeName(mq.getBrokerName()); + if (isOneway) { + this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + } else { + this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + } + } else { + throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); + } + } + + private long fetchConsumeOffsetFromSnode(MessageQueue mq) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + if (null == snodeAddr) { + this.mQClientFactory.updateSnodeInfoFromNameServer(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + } + + if (snodeAddr != null) { + QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader(); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setConsumerGroup(this.groupName); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setEnodeName(mq.getBrokerName()); + return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( + snodeAddr, requestHeader, 1000 * 5); + } else { + throw new MQClientException("Get Offset from Snode[" + mq.getBrokerName() + "] failed, Snode is not exist", null); } } + /** + * Preserved firstly,Compatible with RocketMQ 4.X Version + */ private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); - if (null == snodeAddr){ + if (null == snodeAddr) { this.mQClientFactory.updateSnodeInfoFromNameServer(); - snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); } if (snodeAddr != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java index 440e5779a43121db92b64820d00b2be02b8ceb61..673293a3cc2170672690008e03a521272949a16f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java +++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java @@ -16,9 +16,6 @@ */ package org.apache.rocketmq.client.exception; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.help.FAQUrl; - public class MQSnodeException extends MQBrokerException { public MQSnodeException(int responseCode, String errorMessage) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 0080f1ee534ac5ecce2c04ca230c89124b6ff598..3fdc05d03fd3f52ef72a39ed0e0b039470b03e3a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1194,7 +1194,7 @@ public class MQClientAPIImpl { public SnodeClusterInfo getSnodeClusterInfo( //Todo Redifine snode exception final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException , MQBrokerException{ + RemotingSendRequestException, RemotingConnectException , MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 20a72d8e832db88db5ab8b1574532f6f55901e32..829158be0cbaadfce35ced6519039e1e315264f8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -480,9 +480,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup()); } - public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); + this.offsetStore.updateConsumeOffsetToSnode(mq, offset, isOneway); } public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index ed67077830ea9d6e2cd9681e882d612b925bbdca..23aee312bdfeb055748f56cac0b2fa8abfee820a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -73,7 +73,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; public class DefaultMQPushConsumerImpl implements MQConsumerInner { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f881d981878463ab13d5c333e729866c0a19169b..6c2be58ea4e656ea3a01b2907b6e215384e66d18 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.common.ThreadLocalIndex; -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.ClientRemotingProcessor; import org.apache.rocketmq.client.impl.FindBrokerResult; @@ -80,7 +79,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.protocol.RemotingCommand;