ExecutionManager.java 26.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/**
Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
16
package bftsmart.tom.core;
17

18
import bftsmart.consensus.Consensus;
19
import bftsmart.consensus.Decision;
S
shaozhuguang 已提交
20
import bftsmart.consensus.Epoch;
21
import bftsmart.consensus.messages.ConsensusMessage;
S
shaozhuguang 已提交
22
import bftsmart.consensus.messages.MessageFactory;
23 24
import bftsmart.consensus.roles.Acceptor;
import bftsmart.consensus.roles.Proposer;
25
import bftsmart.reconfiguration.ReplicaTopology;
26
import bftsmart.reconfiguration.ServerViewController;
Z
zhangshuang 已提交
27
import bftsmart.tom.server.defaultservices.DefaultRecoverable;
28
import org.slf4j.LoggerFactory;
29

S
shaozhuguang 已提交
30 31 32
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

33 34

/**
35 36
 * This class manages consensus instances. It can have several epochs if
 * there were problems during consensus.
37 38 39 40 41
 *
 * @author Alysson
 */
public final class ExecutionManager {

42
    private ReplicaTopology topology;
43 44 45 46 47 48 49
    private Acceptor acceptor; // Acceptor role of the PaW algorithm
    private Proposer proposer; // Proposer role of the PaW algorithm
    //******* EDUARDO BEGIN: now these variables are all concentrated in the Reconfigurationmanager **************//
    //private int me; // This process ID
    //private int[] acceptors; // Process ID's of all replicas, including this one
    //private int[] otherAcceptors; // Process ID's of all replicas, except this one
    //******* EDUARDO END **************//
50 51 52
    private Map<Integer, Consensus> consensuses = new TreeMap<Integer, Consensus>(); // Consensuses
    private ReentrantLock consensusesLock = new ReentrantLock(); //lock for consensuses table
    // Paxos messages that were out of context (that didn't belong to the consensus that was/is is progress
53
    private Map<Integer, List<ConsensusMessage>> outOfContext = new HashMap<Integer, List<ConsensusMessage>>();
54
    // Proposes that were out of context (that belonged to future consensuses, and not the one running at the time)
55
    private Map<Integer, ConsensusMessage> outOfContextProposes = new HashMap<Integer, ConsensusMessage>();
56 57 58
    private ReentrantLock outOfContextLock = new ReentrantLock(); //lock for out of context
    private boolean stopped = false; // Is the execution manager stopped?
    // When the execution manager is stopped, incoming paxos messages are stored here
59
    private Queue<ConsensusMessage> stoppedMsgs = new LinkedList<ConsensusMessage>();
60
    private Epoch stoppedEpoch = null; // epoch at which the current consensus was stopped
61 62 63
    private ReentrantLock stoppedMsgsLock = new ReentrantLock(); //lock for stopped messages
    private TOMLayer tomLayer; // TOM layer associated with this execution manager
    private int paxosHighMark; // Paxos high mark for consensus instances
64
    
65
    /** THIS IS JOAO'S CODE, TO HANDLE THE STATE TRANSFER */
66
    
67
    private int revivalHighMark; // Paxos high mark for consensus instances when this replica CID equals 0
68 69
    private int timeoutHighMark; // Paxos high mark for a timed-out replica
    
70
    private int lastRemovedCID = 0; // Addition to fix memory leak
71 72

    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ExecutionManager.class);
73
        
74
    /******************************************************************/
75 76 77 78 79
    
    // This is the new way of storing info about the leader,
    // uncoupled from any consensus instance
    private int currentLeader;
    
80 81 82
    /**
     * Creates a new instance of ExecutionManager
     *
83
     * @param topology
84 85 86 87
     * @param acceptor Acceptor role of the PaW algorithm
     * @param proposer Proposer role of the PaW algorithm
     * @param me This process ID
     */
88
    public ExecutionManager(ReplicaTopology topology, Acceptor acceptor,
S
shaozhuguang 已提交
89
                            Proposer proposer, int me) {
90
        //******* EDUARDO BEGIN **************//
91
        this.topology = topology;
92 93 94 95
        this.acceptor = acceptor;
        this.proposer = proposer;
        //this.me = me;

96
        this.paxosHighMark = this.topology.getStaticConf().getPaxosHighMark();
97
        /** THIS IS JOAO'S CODE, TO HANDLE THE STATE TRANSFER */
98 99
        this.revivalHighMark = this.topology.getStaticConf().getRevivalHighMark();
        this.timeoutHighMark = this.topology.getStaticConf().getTimeoutHighMark();
100 101
        /******************************************************************/
        //******* EDUARDO END **************//
102
        
103
        // Get initial leader
104 105
        if (topology.getCurrentViewProcesses().length > 0)
            currentLeader = topology.getCurrentViewProcesses()[0];
106
        else currentLeader = 0;
107 108 109 110 111 112 113 114
    }
    
    /**
     * Set the current leader
     * @param leader Current leader
     */
    public void setNewLeader (int leader) {
            this.currentLeader = leader;
115 116
    }

117 118 119 120 121 122 123 124
    /**
     * Get the current leader
     * @return Current leader
     */
    public int getCurrentLeader() {
            return currentLeader;
    }
        
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
    /**
     * Sets the TOM layer associated with this execution manager
     * @param tom The TOM layer associated with this execution manager
     */
    public void setTOMLayer(TOMLayer tom) {
        this.tomLayer = tom;

    }

    /**
     * Returns the TOM layer associated with this execution manager
     * @return The TOM layer associated with this execution manager
     */
    public TOMLayer getTOMLayer() {
        return tomLayer;
    }

    /**
     * Returns the acceptor role of the PaW algorithm
     * @return The acceptor role of the PaW algorithm
     */
    public Acceptor getAcceptor() {
        return acceptor;
    }

    public Proposer getProposer() {
        return proposer;
    }

    
    public boolean stopped() {
        return stopped;
    }
J
Joao Sousa 已提交
158 159 160 161 162

    public boolean hasMsgs() {
        return !stoppedMsgs.isEmpty();
    }

163
    public Queue<ConsensusMessage> getStoppedMsgs() {
164 165 166 167 168 169 170 171 172 173
        return stoppedMsgs;
    }
    
    public void clearStopped() {
        stoppedMsgs.clear();
    }
    /**
     * Stops this execution manager
     */
    public void stop() {
Z
zhangshuang 已提交
174
        LOGGER.debug("(ExecutionManager.stoping) Stoping execution manager");
175 176 177
        stoppedMsgsLock.lock();
        this.stopped = true;
        if (tomLayer.getInExec() != -1) {
178
            stoppedEpoch = getConsensus(tomLayer.getInExec()).getLastEpoch();
179
            //stoppedEpoch.getTimeoutTask().cancel();
Z
zhangshuang 已提交
180
//            if (stoppedEpoch != null) LOGGER.debug("(ExecutionManager.stop) Stoping epoch " + stoppedEpoch.getTimestamp() + " of consensus " + tomLayer.getInExec());
Z
zhangshuang 已提交
181

182
            if (stoppedEpoch != null) LOGGER.debug("(ExecutionManager.stop) I am proc {} Stoping epoch {} of consensus {}", topology.getStaticConf().getProcessId(), stoppedEpoch.getTimestamp(), tomLayer.getInExec());
Z
zhangshuang 已提交
183
//            if (stoppedEpoch != null) System.out.println("(ExecutionManager.stop) I am proc  " + controller.getStaticConf().getProcessId() + " Stoping epoch " + stoppedEpoch.getTimestamp() + " of consensus " + tomLayer.getInExec());
184 185 186 187 188 189 190 191 192 193
        }
        stoppedMsgsLock.unlock();
    }

    
    
    /**
     * Restarts this execution manager
     */
    public void restart() {
Z
zhangshuang 已提交
194
        LOGGER.debug("(ExecutionManager.restart) Starting execution manager");
195 196 197 198 199
        stoppedMsgsLock.lock();
        this.stopped = false;

        //process stopped messages
        while (!stoppedMsgs.isEmpty()) {
200
            ConsensusMessage pm = stoppedMsgs.remove();
Z
zhangshuang 已提交
201 202 203
            // 添加LC过程中收到的共识消息到超出预期消息队列,由该队列统一触发超预期消息的处理流程
            addOutOfContextMessage(pm);
//            if (pm.getNumber() > tomLayer.getLastExec()) acceptor.processMessage(pm);
204 205
        }
        stoppedMsgsLock.unlock();
Z
zhangshuang 已提交
206
        LOGGER.debug("(ExecutionManager.restart) Finished stopped messages processing");
207 208 209 210 211 212 213 214 215
    }

    /**
     * Checks if this message can execute now. If it is not possible,
     * it is stored in outOfContextMessages
     *
     * @param msg the received message
     * @return true in case the message can be executed, false otherwise
     */
216
    public final boolean checkLimits(ConsensusMessage msg) {
217 218 219 220 221
        outOfContextLock.lock();
        
        int lastConsId = tomLayer.getLastExec();
        
        int inExec = tomLayer.getInExec();
222 223

        // If rollback occurs, this node no longer processes new messages, wait state transfer
Z
zhangshuang 已提交
224
        boolean rollHappend = tomLayer.execManager.getConsensus(lastConsId).getPrecomputeRolled();
225
        
Z
zhangshuang 已提交
226 227 228
        LOGGER.debug("(ExecutionManager.checkLimits) Received message {}", msg);
        LOGGER.debug("(ExecutionManager.checkLimits) I'm at consensus {} and my last consensus is {}",
                inExec, lastConsId);
229 230 231
        
        boolean isRetrievingState = tomLayer.isRetrievingState();

Z
zhangshuang 已提交
232 233
        boolean isReady = tomLayer.isReady();

234
        if (isRetrievingState) {
Z
zhangshuang 已提交
235
            LOGGER.debug("(ExecutionManager.checkLimits) I'm waiting for a state");
236 237 238 239 240 241 242 243
        }

        boolean canProcessTheMessage = false;

        /** THIS IS JOAO'S CODE, TO HANDLE THE STATE TRANSFER */
        // This serves to re-direct the messages to the out of context
        // while a replica is receiving the state of the others and updating itself
        if (isRetrievingState || // Is this replica retrieving a state?
Z
zhangshuang 已提交
244
                !isReady ||
245 246 247 248 249 250 251
                (!(lastConsId == -1 && msg.getNumber() >= (lastConsId + revivalHighMark)) && //not a recovered replica
                (msg.getNumber() > lastConsId && (msg.getNumber() < (lastConsId + paxosHighMark))) && // not an ahead of time message
                !(stopped && msg.getNumber() >= (lastConsId + timeoutHighMark)))) { // not a timed-out replica which needs to fetch the state

            if (stopped) {//just an optimization to avoid calling the lock in normal case
                stoppedMsgsLock.lock();
                if (stopped) {
Z
zhangshuang 已提交
252
                    LOGGER.debug("(ExecutionManager.checkLimits) I am proc {} adding message for consensus {} to stopped, is retrive state : {}, is ready : {}, last cid is {}, in exe cid is {}", topology.getStaticConf().getProcessId(), msg.getNumber(), isRetrievingState, isReady, lastConsId, inExec);
Z
zhangshuang 已提交
253
//                    System.out.println("(ExecutionManager.checkLimits) I am proc " + controller.getStaticConf().getProcessId() + " adding message for consensus " + msg.getNumber() + " to stoopped");
254
                    //the execution manager was stopped, the messages should be stored
255
                    //for later processing (when the consensus is restarted)
256 257 258 259
                    stoppedMsgs.add(msg);
                }
                stoppedMsgsLock.unlock();
            } else {
Z
zhangshuang 已提交
260
                if (isRetrievingState || !isReady ||
261 262
                        msg.getNumber() > (lastConsId + 1) || 
                        (inExec != -1 && inExec < msg.getNumber()) || 
263
                        (inExec == -1 && msg.getType() != MessageFactory.PROPOSE)) { //not propose message for the next consensus
Z
zhangshuang 已提交
264

Z
zhangshuang 已提交
265 266
                    LOGGER.info("(ExecutionManager.checkLimits) I am proc {}, Message for consensus {} is out of context, adding it to out of context set, last cid is {}, in exe cid is {}, isRetrievingState = {}, isReady = {}", topology.getStaticConf().getProcessId(),
                            msg.getNumber(), lastConsId, inExec, isRetrievingState, isReady);
Z
zhangshuang 已提交
267

268

269
                    //System.out.println("(ExecutionManager.checkLimits) Message for consensus " + 
270 271 272 273
                     //       msg.getNumber() + " is out of context, adding it to out of context set; isRetrievingState="+isRetrievingState);
                    
                    
                    addOutOfContextMessage(msg);
274
                } else if (!rollHappend){ //can process!
275
                    LOGGER.info("(ExecutionManager.checkLimits)I am proc {} ,message for consensus {} can be processed", this.topology.getStaticConf().getProcessId(), msg.getNumber());
276 277 278 279 280 281 282 283 284
            
                    //Logger.debug = false;
                    canProcessTheMessage = true;
                }
            }
        } else if ((lastConsId == -1 && msg.getNumber() >= (lastConsId + revivalHighMark)) || //recovered...
                (msg.getNumber() >= (lastConsId + paxosHighMark)) ||  //or too late replica...
                (stopped && msg.getNumber() >= (lastConsId + timeoutHighMark))) { // or a timed-out replica which needs to fetch the state

285 286
            LOGGER.info("(ExecutionManager.checkLimits) I am proc {}, start state transfer, last cid is {}, recv msg cid is {}, in cid is {}", topology.getStaticConf().getProcessId(), lastConsId, msg.getNumber(), inExec);
            LOGGER.info("I am proc {}, revivalHighMark is {}, paxosHighMark is {}, timeoutHighMark is {}", topology.getStaticConf().getProcessId(), revivalHighMark, paxosHighMark, timeoutHighMark);
287 288
            //Start state transfer
            /** THIS IS JOAO'S CODE, FOR HANLDING THE STATE TRANSFER */
Z
zhangshuang 已提交
289
            LOGGER.info("(ExecutionManager.checkLimits) Message for consensus {} is beyond the paxos highmark, adding it to out of context set", msg.getNumber());
290 291
            addOutOfContextMessage(msg);

292
            if (topology.getStaticConf().isStateTransferEnabled()) {
293 294 295 296
                //Logger.debug = true;
                tomLayer.getStateManager().analyzeState(msg.getNumber());
            }
            else {
297 298 299 300
                LOGGER.error("##################################################################################");
                LOGGER.error("- Ahead-of-time message discarded");
                LOGGER.error("- If many messages of the same consensus are discarded, the replica can halt!");
                LOGGER.error("- Try to increase the 'system.paxos.highMarc' configuration parameter.");
301
                LOGGER.error("- Last consensus executed: {}", lastConsId);
302
                LOGGER.error("##################################################################################");
303 304 305 306 307 308 309 310 311 312
            }
            /******************************************************************/
        }
        
        outOfContextLock.unlock();

        return canProcessTheMessage;
    }

    /**
313
     * Informs if there are messages till to be processed associated the specified consensus
314
     * @param cid The ID for the consensus in question
315 316
     * @return True if there are still messages to be processed, false otherwise
     */
317
    public boolean receivedOutOfContextPropose(int cid) {
318 319
        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
320
        boolean result = outOfContextProposes.get(cid) != null;
321 322 323 324 325 326
        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();

        return result;
    }

327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
    /**
     * Informs if there are messages till to be processed associated the specified consensus
     * @param cid The ID for the consensus in question
     * @return True if there are still messages to be processed, false otherwise
     */
    public boolean receivedOutOfContextWriteAndAccept(int cid) {
        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
        boolean result = outOfContext.get(cid) != null;
        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();

        return result;
    }


Z
zhangshuang 已提交
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
    /**
     * Removes a consensus from this manager, use when rolling back
     * @param id ID of the consensus to be removed
     * @return void
     */
    public void removeSingleConsensus(int id) {

        consensusesLock.lock();
        consensuses.remove(id);
        consensusesLock.unlock();

        outOfContextLock.lock();

        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextProposes.remove(id);
        outOfContext.remove(id);

        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();

    }

365
    /**
366 367 368
     * Removes a consensus from this manager
     * @param id ID of the consensus to be removed
     * @return The consensus that was removed
369
     */
370 371 372 373
    public Consensus removeConsensus(int id) {
        consensusesLock.lock();
        /******* BEGIN CONSENSUS CRITICAL SECTION *******/
        Consensus consensus = consensuses.remove(id);
374

375 376 377 378
        // Addition to fix memory leak
        for (int i = lastRemovedCID; i < id; i++) consensuses.remove(i);
        lastRemovedCID = id;
        
379 380
        /******* END CONSENSUS CRITICAL SECTION *******/
        consensusesLock.unlock();
381 382 383 384 385 386 387 388 389

        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextProposes.remove(id);
        outOfContext.remove(id);

        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();

390
        return consensus;
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
    }

    /** THIS IS JOAO'S CODE, FOR HANDLING THE STATE TRANSFER */
    public void removeOutOfContexts(int id) {

        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
        Integer[] keys = new Integer[outOfContextProposes.keySet().size()];
        outOfContextProposes.keySet().toArray(keys);
        for (int i = 0; i < keys.length; i++) {
            if (keys[i] <= id) {
                outOfContextProposes.remove(keys[i]);
            }
        }

        keys = new Integer[outOfContext.keySet().size()];
        outOfContext.keySet().toArray(keys);
        for (int i = 0; i < keys.length; i++) {
            if (keys[i] <= id) {
                outOfContext.remove(keys[i]);
            }
        }

        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();
    }

    /********************************************************/
    /**
420
     * Returns the specified consensus
421
     *
422
     * @param cid ID of the consensus to be returned
423
     * @return The consensus specified
424
     */
425 426 427
    public Consensus getConsensus(int cid) {
        consensusesLock.lock();
        /******* BEGIN CONSENSUS CRITICAL SECTION *******/
428
        
429
        Consensus consensus = consensuses.get(cid);
430

431
        if (consensus == null) {//there is no consensus created with the given cid
432
            //let's create one...
433
            Decision dec = new Decision(cid);
434

435
            consensus = new Consensus(this, dec);
436

437 438
            //...and add it to the consensuses table
            consensuses.put(cid, consensus);
439 440
        }

441 442
        /******* END CONSENSUS CRITICAL SECTION *******/
        consensusesLock.unlock();
443

444
        return consensus;
445
    }
446 447 448 449 450 451 452 453 454 455 456 457

    /**
     * update consensus roll field
     *
     * @param cid ID of the consensus to be update
     * @return void
     */
    public void updateConsensus(int cid) {
        consensusesLock.lock();

       Consensus consensus =  consensuses.get(cid);

Z
zhangshuang 已提交
458
       consensus.setPrecomputeRolled();
459 460 461 462 463 464 465 466 467

       consensuses.remove(cid);

       consensuses.put(cid, consensus);

       consensusesLock.unlock();

    }

468 469 470
    public boolean isDecidable(int cid) {
        if (receivedOutOfContextPropose(cid)) {
            Consensus cons = getConsensus(cid);
471
            ConsensusMessage prop = outOfContextProposes.get(cons.getId());
472
            Epoch epoch = cons.getEpoch(prop.getEpoch(), topology);
473
            byte[] propHash = tomLayer.computeHash(prop.getValue());
474
            List<ConsensusMessage> msgs = outOfContext.get(cid);
475 476 477
            int countWrites = 0;
            int countAccepts = 0;
            if (msgs != null) {
478
                for (ConsensusMessage msg : msgs) {
Z
zhangshuang 已提交
479
                    // 对于Accept类型的共识消息,需要通过getOrigPropValue取到预计算之前的提议值hash
480
                    if (msg.getEpoch() == epoch.getTimestamp() &&
Z
zhangshuang 已提交
481
                            (Arrays.equals(propHash, msg.getValue()) || Arrays.equals(propHash, msg.getOrigPropValue()))) {
482
                        
483 484
                        if (msg.getType() == MessageFactory.WRITE) countWrites++;
                        else if (msg.getType() == MessageFactory.ACCEPT) countAccepts++;
485 486 487
                    }
                }
            }
488 489 490
            if(topology.getStaticConf().isBFT()){
            	return ((countWrites > (2*topology.getCurrentViewF())) &&
            			(countAccepts > (2*topology.getCurrentViewF())));
491
            }else{
492
            	return (countAccepts > topology.getQuorum());
493 494 495 496
            }
        }
        return false;
    }
497
    public void processOutOfContextPropose(Consensus consensus) {
498 499 500
        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
        
501
        ConsensusMessage prop = outOfContextProposes.remove(consensus.getId());
502
        if (prop != null) {
Z
zhangshuang 已提交
503
            LOGGER.debug("(ExecutionManager.processOutOfContextPropose) {} Processing out of context propose", consensus.getId());
504 505 506 507 508 509 510
            acceptor.processMessage(prop);
        }

        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();
    }

511 512 513 514 515 516 517 518
    public void processOutOfContextWriteAndAccept(Consensus consensus) {
        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
        processOutOfContext(consensus);
        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();
    }

519
    public void processOutOfContext(Consensus consensus) {
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
        try {
            outOfContextLock.lock();
            /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/

            LOGGER.info("[ExecutionManager] processOutOfContext start!");
            //then we have to put the pending paxos messages
            List<ConsensusMessage> messages = outOfContext.remove(consensus.getId());

            // 处于同一轮共识中的消息,保证write的处理先于accept;
            // order start

//            List<ConsensusMessage> orderedMessages = new LinkedList<ConsensusMessage>();
//
//            if (messages != null && messages.size() > 0) {
//
//                for (ConsensusMessage consensusMessage : messages) {
//                    if (consensusMessage.getType() == MessageFactory.WRITE) {
//                        orderedMessages.add(consensusMessage);
//                        messages.remove(consensusMessage);
//                    }
//                }
//            }
//
//            if (messages != null && messages.size() > 0) {
//                for (ConsensusMessage consensusMessage : messages) {
//                    orderedMessages.add(consensusMessage);
//                }
//            }
            // order end

            if (messages != null && messages.size() > 0) {
                LOGGER.debug("(ExecutionManager.processOutOfContext) {} Processing other {} out of context messages.", consensus.getId(), messages.size());

                for (Iterator<ConsensusMessage> i = messages.iterator(); i.hasNext();) {
                    acceptor.processMessage(i.next());
                    if (consensus.isDecided()) {
                        LOGGER.debug("(ExecutionManager.processOutOfContext) consensus {} decided.", consensus.getId());
                        break;
                    }
559
                }
560
                LOGGER.debug("(ExecutionManager.processOutOfContext) cid {} Finished out of context processing", consensus.getId());
561 562
            }

563 564 565 566 567 568 569
            /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        } catch (Exception e) {
            LOGGER.error("(ExecutionManager.processOutOfContext) exception, e = {}!", e.getMessage());
            throw e;
        } finally {
            outOfContextLock.unlock();
        }
570 571 572 573 574 575 576 577
    }

    /**
     * Stores a message established as being out of context (a message that
     * doesn't belong to current executing consensus).
     *
     * @param m Out of context message to be stored
     */
578
    public void addOutOfContextMessage(ConsensusMessage m) {
579 580 581 582
        try {
            outOfContextLock.lock();
            /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
            if (m.getType() == MessageFactory.PROPOSE) {
Z
zhangshuang 已提交
583
                LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
584
                outOfContextProposes.put(m.getNumber(), m);
Z
zhangshuang 已提交
585
            } else {
586 587 588 589 590 591
                List<ConsensusMessage> messages = outOfContext.get(m.getNumber());
                if (messages == null) {
                    messages = new LinkedList<ConsensusMessage>();
                    outOfContext.put(m.getNumber(), messages);
//                    LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
//                    messages.add(m);
Z
zhangshuang 已提交
592
                }
593 594 595 596 597 598 599 600 601 602 603 604 605
//                else {
//                    for (ConsensusMessage consensusMessage : messages) {
//                        // 过滤掉无效的消息:write ,accept消息,如果来自同一轮共识,且属于同一个节点来源,只保留时间戳最新的
//                        if ((m.getSender() == consensusMessage.getSender()) && m.getEpoch() >= consensusMessage.getEpoch()) {
//                            LOGGER.debug("(ExecutionManager.addOutOfContextMessage) removing {}", m);
//                            messages.remove(consensusMessage);
//                            LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
//                            messages.add(m);
//                        }
//                    }
//                }
                LOGGER.debug("(ExecutionManager.addOutOfContextMessage) adding {}", m);
                messages.add(m);
606 607
            }

608 609 610 611 612 613 614
            /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        } catch (Exception e) {
            LOGGER.error("(ExecutionManager.addOutOfContextMessage) exception, e = {}!", e.getMessage());
            throw e;
        } finally {
            outOfContextLock.unlock();
        }
615 616 617 618 619 620
    }

    @Override
    public String toString() {
        return stoppedMsgs.toString();
    }
Z
zhangshuang 已提交
621

622 623
    // 避免重复预计算
    public void preComputeRollback(Consensus cons) {
Z
zhangshuang 已提交
624 625 626
        if (cons != null && cons.getPrecomputed() && !cons.getPrecomputeCommited()) {

					DefaultRecoverable defaultRecoverable = getAcceptor().getDefaultExecutor();
627 628 629 630 631 632 633 634

					for (Epoch epoch : cons.getEpochs().values()) {
					    if (epoch != null && epoch.getBatchId() != null ) {
                            LOGGER.info("I am proc {}, pre compute rollback occur!, cid = {}, epoch = {}", topology.getStaticConf().getProcessId(), cons.getId(), epoch.getTimestamp());
                            defaultRecoverable.preComputeRollback(cons.getId(), epoch.getBatchId());
                        }
                    }
                    cons.setPrecomputed(false);
Z
zhangshuang 已提交
635 636
				}
    }
637
}