提交 c3afcdd6 编写于 作者: Z zhangshuang

fix jira 666 failed ,recovery code, fix later

上级 e80bb2f9
......@@ -197,9 +197,9 @@ public final class Acceptor {
Consensus consensus = executionManager.getConsensus(msg.getNumber());
// 该版本添加特殊处理,后续需要考虑优化掉该处理
if (msg.getType() != MessageFactory.PROPOSE && consensus.getLastEpoch() != null && consensus.getLastEpoch().getTimestamp() > msg.getEpoch()) {
msg = new ConsensusMessage(msg.getType(),msg.getNumber(),consensus.getLastEpoch().getTimestamp(), msg.getSender(), msg.getValue());
}
// if (msg.getType() != MessageFactory.PROPOSE && consensus.getLastEpoch() != null && consensus.getLastEpoch().getTimestamp() > msg.getEpoch()) {
// msg = new ConsensusMessage(msg.getType(),msg.getNumber(),consensus.getLastEpoch().getTimestamp(), msg.getSender(), msg.getValue());
// }
// 检查消息的epoch
if (!checkSucc(consensus, msg.getEpoch())) {
......
......@@ -517,44 +517,56 @@ public final class ExecutionManager {
}
public void processOutOfContext(Consensus consensus) {
outOfContextLock.lock();
/******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
//then we have to put the pending paxos messages
List<ConsensusMessage> messages = outOfContext.remove(consensus.getId());
// 处于同一轮共识中的消息,保证write的处理先于accept;
// order start
List<ConsensusMessage> orderedMessages = new LinkedList<ConsensusMessage>();
for (ConsensusMessage consensusMessage : messages) {
if (consensusMessage.getType() == MessageFactory.WRITE) {
messages.remove(consensusMessage);
orderedMessages.add(consensusMessage);
}
}
for (ConsensusMessage consensusMessage : messages) {
orderedMessages.add(consensusMessage);
}
// order end
if (orderedMessages != null) {
LOGGER.debug("(ExecutionManager.processOutOfContext) {} Processing other {} out of context messages.", consensus.getId(), orderedMessages.size());
for (Iterator<ConsensusMessage> i = orderedMessages.iterator(); i.hasNext();) {
acceptor.processMessage(i.next());
if (consensus.isDecided()) {
LOGGER.debug("(ExecutionManager.processOutOfContext) consensus {} decided.", consensus.getId());
break;
try {
outOfContextLock.lock();
/******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
LOGGER.info("[ExecutionManager] processOutOfContext start!");
//then we have to put the pending paxos messages
List<ConsensusMessage> messages = outOfContext.remove(consensus.getId());
// 处于同一轮共识中的消息,保证write的处理先于accept;
// order start
// List<ConsensusMessage> orderedMessages = new LinkedList<ConsensusMessage>();
//
// if (messages != null && messages.size() > 0) {
//
// for (ConsensusMessage consensusMessage : messages) {
// if (consensusMessage.getType() == MessageFactory.WRITE) {
// orderedMessages.add(consensusMessage);
// messages.remove(consensusMessage);
// }
// }
// }
//
// if (messages != null && messages.size() > 0) {
// for (ConsensusMessage consensusMessage : messages) {
// orderedMessages.add(consensusMessage);
// }
// }
// order end
if (messages != null && messages.size() > 0) {
LOGGER.debug("(ExecutionManager.processOutOfContext) {} Processing other {} out of context messages.", consensus.getId(), messages.size());
for (Iterator<ConsensusMessage> i = messages.iterator(); i.hasNext();) {
acceptor.processMessage(i.next());
if (consensus.isDecided()) {
LOGGER.debug("(ExecutionManager.processOutOfContext) consensus {} decided.", consensus.getId());
break;
}
}
LOGGER.debug("(ExecutionManager.processOutOfContext) cid {} Finished out of context processing", consensus.getId());
}
LOGGER.debug("(ExecutionManager.processOutOfContext) cid {} Finished out of context processing", consensus.getId());
}
/******* END OUTOFCONTEXT CRITICAL SECTION *******/
outOfContextLock.unlock();
/******* END OUTOFCONTEXT CRITICAL SECTION *******/
} catch (Exception e) {
LOGGER.error("(ExecutionManager.processOutOfContext) exception, e = {}!", e.getMessage());
throw e;
} finally {
outOfContextLock.unlock();
}
}
/**
......@@ -564,33 +576,42 @@ public final class ExecutionManager {
* @param m Out of context message to be stored
*/
public void addOutOfContextMessage(ConsensusMessage m) {
outOfContextLock.lock();
/******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
if (m.getType() == MessageFactory.PROPOSE) {
LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
outOfContextProposes.put(m.getNumber(), m);
} else {
List<ConsensusMessage> messages = outOfContext.get(m.getNumber());
if (messages == null) {
messages = new LinkedList<ConsensusMessage>();
outOfContext.put(m.getNumber(), messages);
try {
outOfContextLock.lock();
/******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
if (m.getType() == MessageFactory.PROPOSE) {
LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
messages.add(m);
outOfContextProposes.put(m.getNumber(), m);
} else {
for (ConsensusMessage consensusMessage : messages) {
// 过滤掉无效的消息:write ,accept消息,如果来自同一轮共识,且属于同一个节点来源,只保留时间戳最新的
if ((m.getSender() == consensusMessage.getSender()) && m.getEpoch() >= consensusMessage.getEpoch()) {
LOGGER.debug("(ExecutionManager.addOutOfContextMessage) removing {}", m);
messages.remove(consensusMessage);
LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
messages.add(m);
}
List<ConsensusMessage> messages = outOfContext.get(m.getNumber());
if (messages == null) {
messages = new LinkedList<ConsensusMessage>();
outOfContext.put(m.getNumber(), messages);
// LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
// messages.add(m);
}
// else {
// for (ConsensusMessage consensusMessage : messages) {
// // 过滤掉无效的消息:write ,accept消息,如果来自同一轮共识,且属于同一个节点来源,只保留时间戳最新的
// if ((m.getSender() == consensusMessage.getSender()) && m.getEpoch() >= consensusMessage.getEpoch()) {
// LOGGER.debug("(ExecutionManager.addOutOfContextMessage) removing {}", m);
// messages.remove(consensusMessage);
// LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
// messages.add(m);
// }
// }
// }
LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
messages.add(m);
}
}
/******* END OUTOFCONTEXT CRITICAL SECTION *******/
outOfContextLock.unlock();
/******* END OUTOFCONTEXT CRITICAL SECTION *******/
} catch (Exception e) {
LOGGER.error("(ExecutionManager.addOutOfContextMessage) exception, e = {}!", e.getMessage());
throw e;
} finally {
outOfContextLock.unlock();
}
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册