From 5d3560dc5c3e546b09bc09b9298a2b4433835b85 Mon Sep 17 00:00:00 2001 From: duhengforever Date: Fri, 16 Nov 2018 20:56:17 +0800 Subject: [PATCH] Fixed Transactional message will be lost under extreme condition --- .../TransactionalMessageServiceImpl.java | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 15e5c84f..1c227af1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -198,7 +198,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { - if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) { + if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; @@ -315,33 +315,26 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ * @param removeMap Op message map to determine whether a half message was responded by producer. * @param doneOpOffset Op Message which has been checked. * @param msgExt Half message - * @param checkImmunityTime User defined time to avoid being detected early. * @return Return true if put success, otherwise return false. */ - private boolean checkPrepareQueueOffset(HashMap removeMap, List doneOpOffset, MessageExt msgExt, - long checkImmunityTime) { - if (System.currentTimeMillis() - msgExt.getBornTimestamp() < checkImmunityTime) { - String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); - if (null == prepareQueueOffsetStr) { - return putImmunityMsgBackToHalfQueue(msgExt); + private boolean checkPrepareQueueOffset(HashMap removeMap, List doneOpOffset, + MessageExt msgExt) { + String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); + if (null == prepareQueueOffsetStr) { + return putImmunityMsgBackToHalfQueue(msgExt); + } else { + long prepareQueueOffset = getLong(prepareQueueOffsetStr); + if (-1 == prepareQueueOffset) { + return false; } else { - long prepareQueueOffset = getLong(prepareQueueOffsetStr); - if (-1 == prepareQueueOffset) { - return false; + if (removeMap.containsKey(prepareQueueOffset)) { + long tmpOpOffset = removeMap.remove(prepareQueueOffset); + doneOpOffset.add(tmpOpOffset); + return true; } else { - if (removeMap.containsKey(prepareQueueOffset)) { - long tmpOpOffset = removeMap.remove(prepareQueueOffset); - doneOpOffset.add(tmpOpOffset); - return true; - } else { - return putImmunityMsgBackToHalfQueue(msgExt); - } + return putImmunityMsgBackToHalfQueue(msgExt); } - } - - } else { - return true; } } -- GitLab