提交 ae7b6751 编写于 作者: D dongeforever

Finish the logic for RETRY message of logic queue

上级 085f2392
......@@ -212,6 +212,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return CompletableFuture.completedFuture(response);
}
if (requestHeader.getOriginTopic() != null
&& !msgExt.getTopic().equals(requestHeader.getOriginTopic())) {
//here just do some fence in case of some unexpected offset is income
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed to check the topic name" + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
......
......@@ -715,48 +715,42 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
sendMessageBack(msg, delayLevel, null, mq);
}
private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String desBrokerName = brokerName;
if (mq != null) {
String tmpBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
if (tmpBrokerName != null) {
desBrokerName = tmpBrokerName;
}
}
if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(desBrokerName)) {
desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
}
String brokerAddr = null;
if (null != desBrokerName) {
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName);
if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)
|| (mq != null && MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName()))) {
sendMessageBackAsNormalMessage(msg);
} else {
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
}
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
sendMessageBackAsNormalMessage(msg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
private void sendMessageBackAsNormalMessage(MessageExt msg) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
void ackAsync(MessageExt message, String consumerGroup) {
......
......@@ -385,10 +385,9 @@ logicOffset的决策,依赖于上一个 PhysicalQueue 的最大位点。
* 所有位点相关的API,需要考虑 MappingItem endOffset,因为超过了 endOffset 可能已经不属于 当前 LogicQueue 了
* 新建 MappingItem,需要先获取 旧 MappingItem 的 endOffset
当前实现,为了保证简洁,禁止 PhysicalQueue 被重复利用,每次更新映射都会让物理层面的 writeQueues++ 和 readQueues++
当前实现,为了保证简洁,禁止 PhysicalQueue 被重复利用,每次更新映射都会让物理层面的 writeQueues++ 和 readQueues++
后续实现,可以考虑复用已经被清除掉的Physical,也即已经没有数据,位点从0开始。
#### 备机更新映射
当前,admin操作都是要求在Master操作的。因此,没有这个问题。
Command操作时,提前预判Master是否存在,如果不存在,则提前报错,减少中间失败率。
......@@ -411,10 +410,9 @@ Command操作时,提前预判Master是否存在,如果不存在,则提前
#### 拉取消息时的 中断问题
当1个 PhysicalQueue 被拉取干净时,需要修正 nextBeginOffset 到下一个 PhysicalQueue。
如果没有处理好,则直接会导致拉取中断,无法前进。
#### pullResult 位点由谁设置的问题
类似于Batch,由客户端设置,避免服务端解开消息。
类似于Batch,由客户端设置,避免服务端解开消息:
在PullResultExt中新增字段 offsetDelta。
#### 远程读的性能问题
从实战经验来看,性能损耗几乎不计。
#### 使用习惯的改变
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册