提交 3c264c8f 编写于 作者: S ShannonDing

Polish push message process in client(add broker name)

上级 8e124956
......@@ -218,12 +218,12 @@ public class ClientRemotingProcessor implements RequestProcessor {
final PushMessageHeader requestHeader =
(PushMessageHeader) request
.decodeCommandCustomHeader(PushMessageHeader.class);
final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
boolean result =
this.mqClientFactory.processSnodePushMessage(msg,
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getEnodeName(),
requestHeader.getQueueId(),
requestHeader.getQueueOffset());
......
......@@ -308,12 +308,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//Update local offset according remote offset
String localOffsetKey = pullRequest.getConsumerGroup()
+ "@" + pullRequest.getMessageQueue().getTopic()
+ "@" + pullRequest.getMessageQueue().getQueueId();
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1));
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
......@@ -474,9 +475,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
String localOffsetKey = pullRequest.getConsumerGroup()
+ "@" + pullRequest.getMessageQueue().getTopic()
+ "@" + pullRequest.getMessageQueue().getQueueId();
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
......@@ -498,9 +500,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
String localOffsetKey = pullRequest.getConsumerGroup()
+ "@" + pullRequest.getMessageQueue().getTopic()
+ "@" + pullRequest.getMessageQueue().getQueueId();
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
......@@ -1197,21 +1200,23 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public boolean processPushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final String brokerName,
final int queueID,
final long offset) {
String localOffsetKey = consumerGroup + "@" + topic + "@" + queueID;
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
log.info("Current Local offset have not set, initiallized to -1.");
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
if (localOffset.get() < offset) {
if (localOffset.get() + 1 < offset) {
//should start pull message process
log.debug("Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
log.debug("Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
......@@ -1233,10 +1238,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList);
MessageQueue messageQueue = new MessageQueue(topic, "", queueID);
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
log.info(".......submitConsumeRequest:{},Offset:{}...", localOffsetKey, offset);
}
return true;
}
private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
}
}
......@@ -1376,15 +1376,19 @@ public class MQClientInstance {
public ClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
public boolean processSnodePushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final String brokerName,
final int queueID,
final long offset) {
log.debug("Recieve:processSnodePushMessage :{}-{}-{}-{}-{}",
consumerGroup, topic, brokerName, queueID, offset);
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
consumer.processPushMessage(msg,consumerGroup,topic,queueID,offset);
consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset);
return true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册