提交 d49da7b8 编写于 作者: S ShannonDing

Open real push model for push consumer

上级 673fead5
...@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; ...@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner; import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
...@@ -48,7 +49,6 @@ import org.apache.rocketmq.remoting.RequestProcessor; ...@@ -48,7 +49,6 @@ import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ClientRemotingProcessor implements RequestProcessor { public class ClientRemotingProcessor implements RequestProcessor {
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory; private final MQClientInstance mqClientFactory;
...@@ -60,7 +60,7 @@ public class ClientRemotingProcessor implements RequestProcessor { ...@@ -60,7 +60,7 @@ public class ClientRemotingProcessor implements RequestProcessor {
@Override @Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel; NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext(); ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
switch (request.getCode()) { switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE: case RequestCode.CHECK_TRANSACTION_STATE:
...@@ -77,6 +77,8 @@ public class ClientRemotingProcessor implements RequestProcessor { ...@@ -77,6 +77,8 @@ public class ClientRemotingProcessor implements RequestProcessor {
case RequestCode.CONSUME_MESSAGE_DIRECTLY: case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request); return this.consumeMessageDirectly(ctx, request);
case RequestCode.SNODE_PUSH_MESSAGE:
return this.processSnodePushMessage(ctx, request);
default: default:
break; break;
} }
...@@ -210,4 +212,21 @@ public class ClientRemotingProcessor implements RequestProcessor { ...@@ -210,4 +212,21 @@ public class ClientRemotingProcessor implements RequestProcessor {
return response; return response;
} }
private RemotingCommand processSnodePushMessage(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final PushMessageHeader requestHeader =
(PushMessageHeader) request
.decodeCommandCustomHeader(PushMessageHeader.class);
final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
boolean result =
this.mqClientFactory.processSnodePushMessage(msg,
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getQueueOffset());
return null;
}
} }
...@@ -190,6 +190,8 @@ public class MQClientAPIImpl { ...@@ -190,6 +190,8 @@ public class MQClientAPIImpl {
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.SNODE_PUSH_MESSAGE, this.clientRemotingProcessor, null);
} }
public List<String> getNameServerAddressList() { public List<String> getNameServerAddressList() {
......
...@@ -26,7 +26,10 @@ import java.util.Map; ...@@ -26,7 +26,10 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
...@@ -109,6 +112,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -109,6 +112,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private long queueFlowControlTimes = 0; private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0; private long queueMaxSpanFlowControlTimes = 0;
private boolean realPushModel = true; private boolean realPushModel = true;
private final ConcurrentHashMap<String, AtomicLong> localConsumerOffset = new ConcurrentHashMap<String, AtomicLong>();
private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this(defaultMQPushConsumer, rpcHook, true); this(defaultMQPushConsumer, rpcHook, true);
...@@ -120,6 +125,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -120,6 +125,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
this.realPushModel = realPushModel; this.realPushModel = realPushModel;
if (realPushModel) { if (realPushModel) {
log.info("Open Real Push Model for {}",defaultMQPushConsumer.getConsumerGroup());
rebalanceImpl = new RebalanceRealPushImpl(this); rebalanceImpl = new RebalanceRealPushImpl(this);
} else { } else {
rebalanceImpl = new RebalancePushImpl(this); rebalanceImpl = new RebalancePushImpl(this);
...@@ -300,6 +306,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -300,6 +306,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override @Override
public void onSuccess(PullResult pullResult) { public void onSuccess(PullResult pullResult) {
if (pullResult != null) { if (pullResult != null) {
//Update local offset according remote offset
String localOffsetKey = pullRequest.getConsumerGroup()
+ "@" + pullRequest.getMessageQueue().getTopic()
+ "@" + pullRequest.getMessageQueue().getQueueId();
AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData); subscriptionData);
...@@ -457,6 +473,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -457,6 +473,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
String localOffsetKey = pullRequest.getConsumerGroup()
+ "@" + pullRequest.getMessageQueue().getTopic()
+ "@" + pullRequest.getMessageQueue().getQueueId();
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay); this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
} }
...@@ -473,6 +497,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -473,6 +497,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
public void executePullRequestImmediately(final PullRequest pullRequest) { public void executePullRequestImmediately(final PullRequest pullRequest) {
String localOffsetKey = pullRequest.getConsumerGroup()
+ "@" + pullRequest.getMessageQueue().getTopic()
+ "@" + pullRequest.getMessageQueue().getQueueId();
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
} }
...@@ -1160,4 +1192,39 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -1160,4 +1192,39 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void tryToFindSnodePublishInfo() { private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer(); this.mQClientFactory.updateSnodeInfoFromNameServer();
} }
public boolean processPushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final int queueID,
final long offset) {
String localOffsetKey = consumerGroup + "@" + topic + "@" + queueID;
AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
log.info("Current Local offset have not set, initiallized to -1.");
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
if (localOffset.get() < offset) {
//should start pull message process
log.debug("Current Local key:{} and offset:{} and push offset:{}",localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE",localOffsetKey);
}
pullStop = this.pullStopped.get(localOffsetKey);
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...",localOffsetKey);
}
//update local offset
localOffset.set(offset);
//submit to process queue
}
return true;
}
} }
...@@ -1376,4 +1376,18 @@ public class MQClientInstance { ...@@ -1376,4 +1376,18 @@ public class MQClientInstance {
public ClientConfig getNettyClientConfig() { public ClientConfig getNettyClientConfig() {
return nettyClientConfig; return nettyClientConfig;
} }
public boolean processSnodePushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final int queueID,
final long offset) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
consumer.processPushMessage(msg,consumerGroup,topic,queueID,offset);
return true;
}
return false;
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册