diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 15e5c84ff2df5dc8cbc486f85022fff7811d3e20..1c227af150200adde921551b7575b965df96c58c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -198,7 +198,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { - if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) { + if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; @@ -315,33 +315,26 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ * @param removeMap Op message map to determine whether a half message was responded by producer. * @param doneOpOffset Op Message which has been checked. * @param msgExt Half message - * @param checkImmunityTime User defined time to avoid being detected early. * @return Return true if put success, otherwise return false. */ - private boolean checkPrepareQueueOffset(HashMap removeMap, List doneOpOffset, MessageExt msgExt, - long checkImmunityTime) { - if (System.currentTimeMillis() - msgExt.getBornTimestamp() < checkImmunityTime) { - String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); - if (null == prepareQueueOffsetStr) { - return putImmunityMsgBackToHalfQueue(msgExt); + private boolean checkPrepareQueueOffset(HashMap removeMap, List doneOpOffset, + MessageExt msgExt) { + String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); + if (null == prepareQueueOffsetStr) { + return putImmunityMsgBackToHalfQueue(msgExt); + } else { + long prepareQueueOffset = getLong(prepareQueueOffsetStr); + if (-1 == prepareQueueOffset) { + return false; } else { - long prepareQueueOffset = getLong(prepareQueueOffsetStr); - if (-1 == prepareQueueOffset) { - return false; + if (removeMap.containsKey(prepareQueueOffset)) { + long tmpOpOffset = removeMap.remove(prepareQueueOffset); + doneOpOffset.add(tmpOpOffset); + return true; } else { - if (removeMap.containsKey(prepareQueueOffset)) { - long tmpOpOffset = removeMap.remove(prepareQueueOffset); - doneOpOffset.add(tmpOpOffset); - return true; - } else { - return putImmunityMsgBackToHalfQueue(msgExt); - } + return putImmunityMsgBackToHalfQueue(msgExt); } - } - - } else { - return true; } }