diff --git a/src/main/java/bftsmart/consensus/roles/Acceptor.java b/src/main/java/bftsmart/consensus/roles/Acceptor.java index 4c092c7a010f2b4f1b046db00aa918dd18e7f24a..63f65b31bae90b1627d8ff126b75c887c4132a25 100644 --- a/src/main/java/bftsmart/consensus/roles/Acceptor.java +++ b/src/main/java/bftsmart/consensus/roles/Acceptor.java @@ -197,9 +197,9 @@ public final class Acceptor { Consensus consensus = executionManager.getConsensus(msg.getNumber()); // 该版本添加特殊处理,后续需要考虑优化掉该处理 - if (msg.getType() != MessageFactory.PROPOSE && consensus.getLastEpoch() != null && consensus.getLastEpoch().getTimestamp() > msg.getEpoch()) { - msg = new ConsensusMessage(msg.getType(),msg.getNumber(),consensus.getLastEpoch().getTimestamp(), msg.getSender(), msg.getValue()); - } +// if (msg.getType() != MessageFactory.PROPOSE && consensus.getLastEpoch() != null && consensus.getLastEpoch().getTimestamp() > msg.getEpoch()) { +// msg = new ConsensusMessage(msg.getType(),msg.getNumber(),consensus.getLastEpoch().getTimestamp(), msg.getSender(), msg.getValue()); +// } // 检查消息的epoch if (!checkSucc(consensus, msg.getEpoch())) { diff --git a/src/main/java/bftsmart/tom/core/ExecutionManager.java b/src/main/java/bftsmart/tom/core/ExecutionManager.java index b3fb6862a61c311e11831a8a4f2d3ca5c65bc4b1..da7774d532a0a94f119ee6f46bdb0615ebcc6e7a 100644 --- a/src/main/java/bftsmart/tom/core/ExecutionManager.java +++ b/src/main/java/bftsmart/tom/core/ExecutionManager.java @@ -517,44 +517,56 @@ public final class ExecutionManager { } public void processOutOfContext(Consensus consensus) { - outOfContextLock.lock(); - /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ - - //then we have to put the pending paxos messages - List messages = outOfContext.remove(consensus.getId()); - - // 处于同一轮共识中的消息,保证write的处理先于accept; - // order start - List orderedMessages = new LinkedList(); - - for (ConsensusMessage consensusMessage : messages) { - if (consensusMessage.getType() == MessageFactory.WRITE) { - messages.remove(consensusMessage); - orderedMessages.add(consensusMessage); - } - } - - for (ConsensusMessage consensusMessage : messages) { - orderedMessages.add(consensusMessage); - } - - // order end - - if (orderedMessages != null) { - LOGGER.debug("(ExecutionManager.processOutOfContext) {} Processing other {} out of context messages.", consensus.getId(), orderedMessages.size()); - - for (Iterator i = orderedMessages.iterator(); i.hasNext();) { - acceptor.processMessage(i.next()); - if (consensus.isDecided()) { - LOGGER.debug("(ExecutionManager.processOutOfContext) consensus {} decided.", consensus.getId()); - break; + try { + outOfContextLock.lock(); + /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ + + LOGGER.info("[ExecutionManager] processOutOfContext start!"); + //then we have to put the pending paxos messages + List messages = outOfContext.remove(consensus.getId()); + + // 处于同一轮共识中的消息,保证write的处理先于accept; + // order start + +// List orderedMessages = new LinkedList(); +// +// if (messages != null && messages.size() > 0) { +// +// for (ConsensusMessage consensusMessage : messages) { +// if (consensusMessage.getType() == MessageFactory.WRITE) { +// orderedMessages.add(consensusMessage); +// messages.remove(consensusMessage); +// } +// } +// } +// +// if (messages != null && messages.size() > 0) { +// for (ConsensusMessage consensusMessage : messages) { +// orderedMessages.add(consensusMessage); +// } +// } + // order end + + if (messages != null && messages.size() > 0) { + LOGGER.debug("(ExecutionManager.processOutOfContext) {} Processing other {} out of context messages.", consensus.getId(), messages.size()); + + for (Iterator i = messages.iterator(); i.hasNext();) { + acceptor.processMessage(i.next()); + if (consensus.isDecided()) { + LOGGER.debug("(ExecutionManager.processOutOfContext) consensus {} decided.", consensus.getId()); + break; + } } + LOGGER.debug("(ExecutionManager.processOutOfContext) cid {} Finished out of context processing", consensus.getId()); } - LOGGER.debug("(ExecutionManager.processOutOfContext) cid {} Finished out of context processing", consensus.getId()); - } - /******* END OUTOFCONTEXT CRITICAL SECTION *******/ - outOfContextLock.unlock(); + /******* END OUTOFCONTEXT CRITICAL SECTION *******/ + } catch (Exception e) { + LOGGER.error("(ExecutionManager.processOutOfContext) exception, e = {}!", e.getMessage()); + throw e; + } finally { + outOfContextLock.unlock(); + } } /** @@ -564,33 +576,42 @@ public final class ExecutionManager { * @param m Out of context message to be stored */ public void addOutOfContextMessage(ConsensusMessage m) { - outOfContextLock.lock(); - /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ - if (m.getType() == MessageFactory.PROPOSE) { - LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m); - outOfContextProposes.put(m.getNumber(), m); - } else { - List messages = outOfContext.get(m.getNumber()); - if (messages == null) { - messages = new LinkedList(); - outOfContext.put(m.getNumber(), messages); + try { + outOfContextLock.lock(); + /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ + if (m.getType() == MessageFactory.PROPOSE) { LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m); - messages.add(m); + outOfContextProposes.put(m.getNumber(), m); } else { - for (ConsensusMessage consensusMessage : messages) { - // 过滤掉无效的消息:write ,accept消息,如果来自同一轮共识,且属于同一个节点来源,只保留时间戳最新的 - if ((m.getSender() == consensusMessage.getSender()) && m.getEpoch() >= consensusMessage.getEpoch()) { - LOGGER.debug("(ExecutionManager.addOutOfContextMessage) removing {}", m); - messages.remove(consensusMessage); - LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m); - messages.add(m); - } + List messages = outOfContext.get(m.getNumber()); + if (messages == null) { + messages = new LinkedList(); + outOfContext.put(m.getNumber(), messages); +// LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m); +// messages.add(m); } +// else { +// for (ConsensusMessage consensusMessage : messages) { +// // 过滤掉无效的消息:write ,accept消息,如果来自同一轮共识,且属于同一个节点来源,只保留时间戳最新的 +// if ((m.getSender() == consensusMessage.getSender()) && m.getEpoch() >= consensusMessage.getEpoch()) { +// LOGGER.debug("(ExecutionManager.addOutOfContextMessage) removing {}", m); +// messages.remove(consensusMessage); +// LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m); +// messages.add(m); +// } +// } +// } + LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m); + messages.add(m); } - } - /******* END OUTOFCONTEXT CRITICAL SECTION *******/ - outOfContextLock.unlock(); + /******* END OUTOFCONTEXT CRITICAL SECTION *******/ + } catch (Exception e) { + LOGGER.error("(ExecutionManager.addOutOfContextMessage) exception, e = {}!", e.getMessage()); + throw e; + } finally { + outOfContextLock.unlock(); + } } @Override