提交 5d3560dc 编写于 作者: D duhengforever 提交者: von gosling

Fixed Transactional message will be lost under extreme condition

上级 f27dc755
...@@ -198,7 +198,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -198,7 +198,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
if (null != checkImmunityTimeStr) { if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) { if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1; newOffset = i + 1;
i++; i++;
continue; continue;
...@@ -315,33 +315,26 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -315,33 +315,26 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
* @param removeMap Op message map to determine whether a half message was responded by producer. * @param removeMap Op message map to determine whether a half message was responded by producer.
* @param doneOpOffset Op Message which has been checked. * @param doneOpOffset Op Message which has been checked.
* @param msgExt Half message * @param msgExt Half message
* @param checkImmunityTime User defined time to avoid being detected early.
* @return Return true if put success, otherwise return false. * @return Return true if put success, otherwise return false.
*/ */
private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset, MessageExt msgExt, private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset,
long checkImmunityTime) { MessageExt msgExt) {
if (System.currentTimeMillis() - msgExt.getBornTimestamp() < checkImmunityTime) { String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); if (null == prepareQueueOffsetStr) {
if (null == prepareQueueOffsetStr) { return putImmunityMsgBackToHalfQueue(msgExt);
return putImmunityMsgBackToHalfQueue(msgExt); } else {
long prepareQueueOffset = getLong(prepareQueueOffsetStr);
if (-1 == prepareQueueOffset) {
return false;
} else { } else {
long prepareQueueOffset = getLong(prepareQueueOffsetStr); if (removeMap.containsKey(prepareQueueOffset)) {
if (-1 == prepareQueueOffset) { long tmpOpOffset = removeMap.remove(prepareQueueOffset);
return false; doneOpOffset.add(tmpOpOffset);
return true;
} else { } else {
if (removeMap.containsKey(prepareQueueOffset)) { return putImmunityMsgBackToHalfQueue(msgExt);
long tmpOpOffset = removeMap.remove(prepareQueueOffset);
doneOpOffset.add(tmpOpOffset);
return true;
} else {
return putImmunityMsgBackToHalfQueue(msgExt);
}
} }
} }
} else {
return true;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册