提交 1aa6d197 编写于 作者: H huzongtang

[ISSUE #743]fix codestyle issue and adjust some issues in client module.

上级 81b4293a
...@@ -160,6 +160,12 @@ public class LocalFileOffsetStore implements OffsetStore { ...@@ -160,6 +160,12 @@ public class LocalFileOffsetStore implements OffsetStore {
} }
@Override
public void updateConsumeOffsetToSnode(final MessageQueue mq, final long offset, final boolean isOneway)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
}
@Override @Override
public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway) public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......
...@@ -71,4 +71,12 @@ public interface OffsetStore { ...@@ -71,4 +71,12 @@ public interface OffsetStore {
*/ */
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException; MQBrokerException, InterruptedException, MQClientException;
/**
* @param mq
* @param offset
* @param isOneway
*/
void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
} }
...@@ -88,7 +88,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -88,7 +88,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
} }
case READ_FROM_STORE: { case READ_FROM_STORE: {
try { try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); long brokerOffset = this.fetchConsumeOffsetFromSnode(mq);
AtomicLong offset = new AtomicLong(brokerOffset); AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false); this.updateOffset(mq, offset.get(), false);
return brokerOffset; return brokerOffset;
...@@ -195,20 +195,20 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -195,20 +195,20 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
} }
private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException, private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true); updateConsumeOffsetToSnode(mq, offset, true);
} }
/** /**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
*/ */
@Override @Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr){ if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer(); this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
} }
if (snodeAddr != null) { if (snodeAddr != null) {
...@@ -226,16 +226,71 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -226,16 +226,71 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
snodeAddr, requestHeader, 1000 * 5); snodeAddr, requestHeader, 1000 * 5);
} }
} else { } else {
throw new MQClientException("Update offset to Broker[" + mq.getBrokerName() + "] failed, Snode is null.", null); throw new MQClientException("Update offset to Snode[" + mq.getBrokerName() + "] failed, Snode is null.", null);
}
}
/**
* Preserved firstly,Compatible with RocketMQ 4.X Version
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset,
boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
requestHeader.setEnodeName(mq.getBrokerName());
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
private long fetchConsumeOffsetFromSnode(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
}
if (snodeAddr != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setEnodeName(mq.getBrokerName());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
snodeAddr, requestHeader, 1000 * 5);
} else {
throw new MQClientException("Get Offset from Snode[" + mq.getBrokerName() + "] failed, Snode is not exist", null);
} }
} }
/**
* Preserved firstly,Compatible with RocketMQ 4.X Version
*/
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException { InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr){ if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer(); this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
} }
if (snodeAddr != null) { if (snodeAddr != null) {
......
...@@ -16,9 +16,6 @@ ...@@ -16,9 +16,6 @@
*/ */
package org.apache.rocketmq.client.exception; package org.apache.rocketmq.client.exception;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
public class MQSnodeException extends MQBrokerException { public class MQSnodeException extends MQBrokerException {
public MQSnodeException(int responseCode, String errorMessage) { public MQSnodeException(int responseCode, String errorMessage) {
......
...@@ -1194,7 +1194,7 @@ public class MQClientAPIImpl { ...@@ -1194,7 +1194,7 @@ public class MQClientAPIImpl {
public SnodeClusterInfo getSnodeClusterInfo( public SnodeClusterInfo getSnodeClusterInfo(
//Todo Redifine snode exception //Todo Redifine snode exception
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException , MQBrokerException{ RemotingSendRequestException, RemotingConnectException , MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
......
...@@ -480,9 +480,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -480,9 +480,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup()); sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup());
} }
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); this.offsetStore.updateConsumeOffsetToSnode(mq, offset, isOneway);
} }
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
......
...@@ -73,7 +73,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; ...@@ -73,7 +73,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultMQPushConsumerImpl implements MQConsumerInner { public class DefaultMQPushConsumerImpl implements MQConsumerInner {
......
...@@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock; ...@@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor; import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.FindBrokerResult;
...@@ -80,7 +79,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; ...@@ -80,7 +79,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册