diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 74a6e3341538976bb59123aaa65f7a65bac950da..c8c39199c74c96cca93c6d9026717f01f7d1e584 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.producer.MQProducerInner; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.protocol.header.PushMessageHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; @@ -48,7 +49,6 @@ import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; - public class ClientRemotingProcessor implements RequestProcessor { private final InternalLogger log = ClientLogger.getLog(); private final MQClientInstance mqClientFactory; @@ -60,7 +60,7 @@ public class ClientRemotingProcessor implements RequestProcessor { @Override public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand request) throws RemotingCommandException { - NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel; + NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel; ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext(); switch (request.getCode()) { case RequestCode.CHECK_TRANSACTION_STATE: @@ -77,6 +77,8 @@ public class ClientRemotingProcessor implements RequestProcessor { case RequestCode.CONSUME_MESSAGE_DIRECTLY: return this.consumeMessageDirectly(ctx, request); + case RequestCode.SNODE_PUSH_MESSAGE: + return this.processSnodePushMessage(ctx, request); default: break; } @@ -210,4 +212,21 @@ public class ClientRemotingProcessor implements RequestProcessor { return response; } + + private RemotingCommand processSnodePushMessage(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final PushMessageHeader requestHeader = + (PushMessageHeader) request + .decodeCommandCustomHeader(PushMessageHeader.class); + + final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody())); + boolean result = + this.mqClientFactory.processSnodePushMessage(msg, + requestHeader.getConsumerGroup(), + requestHeader.getTopic(), + requestHeader.getQueueId(), + requestHeader.getQueueOffset()); + + return null; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index ab93573061c0f50241f3e38b97a6a48849e1c0fb..585d0026c28457dc42a7b417ce059cc48a2a1ce0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -190,6 +190,8 @@ public class MQClientAPIImpl { this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null); + + this.remotingClient.registerProcessor(RequestCode.SNODE_PUSH_MESSAGE, this.clientRemotingProcessor, null); } public List getNameServerAddressList() { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 303a0af2d2c214e6465a0caa32141d748d5b550b..5c22a8f6c359caf42a23fc7a964bcc5d9dda8113 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -26,7 +26,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -109,6 +112,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private long queueFlowControlTimes = 0; private long queueMaxSpanFlowControlTimes = 0; private boolean realPushModel = true; + private final ConcurrentHashMap localConsumerOffset = new ConcurrentHashMap(); + private final ConcurrentHashMap pullStopped = new ConcurrentHashMap(); public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { this(defaultMQPushConsumer, rpcHook, true); @@ -120,6 +125,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.rpcHook = rpcHook; this.realPushModel = realPushModel; if (realPushModel) { + log.info("Open Real Push Model for {}",defaultMQPushConsumer.getConsumerGroup()); rebalanceImpl = new RebalanceRealPushImpl(this); } else { rebalanceImpl = new RebalancePushImpl(this); @@ -300,6 +306,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { + //Update local offset according remote offset + String localOffsetKey = pullRequest.getConsumerGroup() + + "@" + pullRequest.getMessageQueue().getTopic() + + "@" + pullRequest.getMessageQueue().getQueueId(); + AtomicLong localOffset = localConsumerOffset.get(localOffsetKey); + if (localOffset == null) { + localConsumerOffset.put(localOffsetKey, new AtomicLong(-1)); + } + localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset()); + pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); @@ -457,6 +473,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { + String localOffsetKey = pullRequest.getConsumerGroup() + + "@" + pullRequest.getMessageQueue().getTopic() + + "@" + pullRequest.getMessageQueue().getQueueId(); + if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) { + //Stop pull request + log.info("Stop pull request, {}", localOffsetKey); + return; + } this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay); } @@ -473,6 +497,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } public void executePullRequestImmediately(final PullRequest pullRequest) { + String localOffsetKey = pullRequest.getConsumerGroup() + + "@" + pullRequest.getMessageQueue().getTopic() + + "@" + pullRequest.getMessageQueue().getQueueId(); + if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) { + //Stop pull request + log.info("Stop pull request, {}", localOffsetKey); + return; + } this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); } @@ -1160,4 +1192,39 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private void tryToFindSnodePublishInfo() { this.mQClientFactory.updateSnodeInfoFromNameServer(); } + + public boolean processPushMessage(final MessageExt msg, + final String consumerGroup, + final String topic, + final int queueID, + final long offset) { + String localOffsetKey = consumerGroup + "@" + topic + "@" + queueID; + AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey); + if (localOffset == null) { + log.info("Current Local offset have not set, initiallized to -1."); + this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1)); + return false; + } + if (localOffset.get() < offset) { + //should start pull message process + log.debug("Current Local key:{} and offset:{} and push offset:{}",localOffsetKey, localOffset.get(), offset); + return false; + } else { + //Stop pull request + AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); + if (pullStop == null) { + this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); + log.info("Pull stop flag of {} is not set, initialize to TRUE",localOffsetKey); + } + pullStop = this.pullStopped.get(localOffsetKey); + if (!pullStop.get()) { + pullStop.set(true); + log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...",localOffsetKey); + } + //update local offset + localOffset.set(offset); + //submit to process queue + } + return true; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 7fa1541c9a11ef131a96b3f8e08a08ef76ed3733..4a81c3b7d5a89ade677e51ae03826d3fc41c359c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1376,4 +1376,18 @@ public class MQClientInstance { public ClientConfig getNettyClientConfig() { return nettyClientConfig; } + public boolean processSnodePushMessage(final MessageExt msg, + final String consumerGroup, + final String topic, + final int queueID, + final long offset) { + MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); + if (null != mqConsumerInner) { + DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner; + consumer.processPushMessage(msg,consumerGroup,topic,queueID,offset); + return true; + } + + return false; + } }