ExecutionManager.java 19.7 KB
Newer Older
P
pjsousa@gmail.com 已提交
1 2
/**
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
3
 *
P
pjsousa@gmail.com 已提交
4
 * This file is part of SMaRt.
5
 *
P
pjsousa@gmail.com 已提交
6 7 8 9
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
10
 *
P
pjsousa@gmail.com 已提交
11 12
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
P
pjsousa@gmail.com 已提交
14
 * GNU General Public License for more details.
15
 *
P
pjsousa@gmail.com 已提交
16 17 18 19 20 21 22 23 24 25
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */

package navigators.smart.paxosatwar.executionmanager;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
26
import java.util.Set;
P
pjsousa@gmail.com 已提交
27 28 29 30 31 32 33 34
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;

import navigators.smart.paxosatwar.Consensus;
import navigators.smart.paxosatwar.messages.MessageFactory;
import navigators.smart.paxosatwar.messages.PaxosMessage;
import navigators.smart.paxosatwar.roles.Acceptor;
import navigators.smart.paxosatwar.roles.Proposer;
35
import navigators.smart.reconfiguration.ReconfigurationManager;
P
pjsousa@gmail.com 已提交
36 37 38 39 40 41 42 43 44 45 46 47
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.util.Logger;


/**
 * This classe manages consensus instances. Each execution is a consensus
 * instance. It can have several rounds if there were problems during consensus.
 *
 * @author Alysson
 */
public final class ExecutionManager {

48 49 50
    private ReconfigurationManager reconfManager;


P
pjsousa@gmail.com 已提交
51 52
    private Acceptor acceptor; // Acceptor role of the PaW algorithm
    private Proposer proposer; // Proposer role of the PaW algorithm
53 54 55 56 57 58

    //******* EDUARDO BEGIN: agora estas variaveis estao todas concentradas na Reconfigurationmanager **************//
    //private int me; // This process ID
    //private int[] acceptors; // Process ID's of all replicas, including this one
    //private int[] otherAcceptors; // Process ID's of all replicas, except this one
    //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78

    private Map<Integer, Execution> executions = new TreeMap<Integer, Execution>(); // Executions
    private ReentrantLock executionsLock = new ReentrantLock(); //lock for executions table

    // Paxos messages that were out of context (that didn't belong to the execution that was/is is progress
    private Map<Integer, List<PaxosMessage>> outOfContext = new HashMap<Integer, List<PaxosMessage>>();
    // Proposes that were out of context (that belonged to future executions, and not the one running at the time)
    private Map<Integer, PaxosMessage> outOfContextProposes = new HashMap<Integer, PaxosMessage>();
    private ReentrantLock outOfContextLock = new ReentrantLock(); //lock for out of context

    private boolean stopped = false; // Is the execution manager stopped?
    // When the execution manager is stopped, incoming paxos messages are stored here
    private List<PaxosMessage> stoppedMsgs = new LinkedList<PaxosMessage>();
    private Round stoppedRound = null; // round at which the current execution was stoppped
    private ReentrantLock stoppedMsgsLock = new ReentrantLock(); //lock for stopped messages


    private TOMLayer tomLayer; // TOM layer associated with this execution manager
    private long initialTimeout; // initial timeout for rounds
    private int paxosHighMark; // Paxos high mark for consensus instances
79 80 81
    /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
    private int revivalHighMark; // Paxos high mark for consensus instances when this replica EID equals 0
    /******************************************************************/
P
pjsousa@gmail.com 已提交
82 83 84 85 86 87 88 89 90 91 92

    /**
     * Creates a new instance of ExecutionManager
     *
     * @param acceptor Acceptor role of the PaW algorithm
     * @param proposer Proposer role of the PaW algorithm
     * @param acceptors Process ID's of all replicas, including this one
     * @param f Maximum number of replicas that can be faulty
     * @param me This process ID
     * @param initialTimeout initial timeout for rounds
     */
93 94 95 96
    public ExecutionManager(ReconfigurationManager manager, Acceptor acceptor,
                                Proposer proposer, int me, long initialTimeout) {
        //******* EDUARDO BEGIN **************//
        this.reconfManager = manager;
P
pjsousa@gmail.com 已提交
97 98
        this.acceptor = acceptor;
        this.proposer = proposer;
99 100
        //this.acceptors = manager.getCurrentViewProcesses();
        //this.me = me;
P
pjsousa@gmail.com 已提交
101
        this.initialTimeout = initialTimeout;
102 103 104 105 106 107 108

        this.paxosHighMark = reconfManager.getStaticConf().getPaxosHighMark();
        /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
        this.revivalHighMark = reconfManager.getStaticConf().getRevivalHighMark();
        /******************************************************************/

        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
109 110 111 112 113 114 115 116
    }

    /**
     * Sets the TOM layer associated with this execution manager
     * @param tom The TOM layer associated with this execution manager
     */
    public void setTOMLayer(TOMLayer tom) {
        this.tomLayer = tom;
117

P
pjsousa@gmail.com 已提交
118 119 120 121 122 123 124 125 126 127
    }

    /**
     * Returns the TOM layer associated with this execution manager
     * @return The TOM layer associated with this execution manager
     */
    public TOMLayer getTOMLayer() {
        return tomLayer;
    }

128 129
    //******* EDUARDO BEGIN: isso tudo deve ser obtido a partir do ReconfigurationManager **************//

P
pjsousa@gmail.com 已提交
130 131 132 133
    /**
     * Returns this process ID
     * @return This process ID
     */
134
   /* public int getProcessId() {
P
pjsousa@gmail.com 已提交
135
        return me;
136
    }*/
P
pjsousa@gmail.com 已提交
137 138 139 140 141

    /**
     * Returns the process ID's of all replicas, including this one
     * @return Array of the process ID's of all replicas, including this one
     */
142
   /* public int[] getAcceptors() {
P
pjsousa@gmail.com 已提交
143
        return acceptors;
144
    }*/
P
pjsousa@gmail.com 已提交
145 146 147 148 149

    /**
     * Returns the process ID's of all replicas, except this one
     * @return Array of the process ID's of all replicas, except this one
     */
150
   /*public int[] getOtherAcceptors() {
P
pjsousa@gmail.com 已提交
151 152 153 154 155 156 157 158 159 160 161
        if (otherAcceptors == null) {
            otherAcceptors = new int[acceptors.length - 1];
            int c = 0;
            for (int i = 0; i < acceptors.length; i++) {
                if (acceptors[i] != me) {
                    otherAcceptors[c++] = acceptors[i];
                }
            }
        }

        return otherAcceptors;
162 163 164
    }*/

    //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
165 166 167 168 169 170 171 172 173

    /**
     * Returns the acceptor role of the PaW algorithm
     * @return The acceptor role of the PaW algorithm
     */
    public Acceptor getAcceptor() {
        return acceptor;
    }

174 175 176 177
    public Proposer getProposer() {
        return proposer;
    }

178

179

P
pjsousa@gmail.com 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
    /**
     * Stops this execution manager
     */
    public void stop() {
        Logger.println("(ExecutionManager.stoping) Stoping execution manager");
        stoppedMsgsLock.lock();
        this.stopped = true;
        if (tomLayer.getInExec() != -1) {
            stoppedRound = getExecution(tomLayer.getInExec()).getLastRound();
            stoppedRound.getTimeoutTask().cancel();
            Logger.println("(ExecutionManager.stop) Stoping round " + stoppedRound.getNumber() + " of consensus " + stoppedRound.getExecution().getId());
        }
        stoppedMsgsLock.unlock();
    }

    /**
     * Restarts this execution manager
     */
    public void restart() {
        Logger.println("(ExecutionManager.restart) Starting execution manager");
        stoppedMsgsLock.lock();
        this.stopped = false;
        if (stoppedRound != null) {
            acceptor.scheduleTimeout(stoppedRound);
            stoppedRound = null;
        }

        //process stopped messages
        for (int i = 0; i < stoppedMsgs.size(); i++) {
            acceptor.processMessage(stoppedMsgs.remove(i));
        }
        stoppedMsgsLock.unlock();
        Logger.println("(ExecutionManager.restart) Finished stopped messages processing");
    }

    /**
     * Checks if this message can execute now. If it is not possible,
     * it is stored in outOfContextMessages
     *
     * @param msg the received message
     * @return true in case the message can be executed, false otherwise
     */
    public final boolean checkLimits(PaxosMessage msg) {
        outOfContextLock.lock();
        int consId = msg.getNumber();
        int lastConsId = tomLayer.getLastExec();
226 227 228
        /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
        int currentConsId = tomLayer.getInExec();
        /******************************************************************/
229 230 231
        int msgType = msg.getPaxosType();
        boolean isRetrievingState = tomLayer.isRetrievingState();

232
        String type = null;
233 234
        switch (msgType) {
            case MessageFactory.PROPOSE:
235
                type = "PROPOSE";
236 237
                break;
            case MessageFactory.WEAK:
238
                type = "WEAK";
239 240
                break;
            case MessageFactory.STRONG:
241
                type = "STRONG";
242 243
                break;
            case MessageFactory.DECIDE:
244
                type = "DECIDE";
245 246
                break;
            case MessageFactory.FREEZE:
247
                type = "FREEZE";
248 249
                break;
            case MessageFactory.COLLECT:
250
                type = "COLLECT";
251 252
                break;
            default:
253
                type = "";
254 255
                break;
        }
256
        if (isRetrievingState) Logger.println("(ExecutionManager.checkLimits) I'm waiting for a state");
257
        Logger.println("(ExecutionManager.checkLimits) I received a message from replica "+ msg.getSender() + " for execution " + consId + " of type " + type);
258
        Logger.println("(ExecutionManager.checkLimits) I'm at execution " + currentConsId);
259
        Logger.println("(ExecutionManager.checkLimits) My last las execution is " + lastConsId);
P
pjsousa@gmail.com 已提交
260 261

        boolean canProcessTheMessage = false;
262

263
        if (
264

265 266 267 268 269
                /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */

                // Isto serve para re-direccionar as mensagens para o out of context
                // enquanto a replica esta a receber o estado das outras e a actualizar-se

270
                isRetrievingState || // Is this replica retrieving a state?
271 272 273

                // Is this not a revived replica?
                (!(currentConsId == -1 && lastConsId == -1 && consId >= (lastConsId + revivalHighMark)) &&
274 275
                /******************************************************************/

276
                (consId > lastConsId  && (consId < (lastConsId + paxosHighMark))))
277 278 279

            ) { // Is this message within the low and high marks (or maybe is the replica synchronizing) ?

P
pjsousa@gmail.com 已提交
280 281 282 283 284 285 286 287 288 289 290
            if(stopped) {//just an optimization to avoid calling the lock in normal case
                stoppedMsgsLock.lock();
                if (stopped) {
                    Logger.println("(ExecutionManager.checkLimits) adding message for execution "+consId+" to stoopped");
                    //the execution manager was stopped, the messages should be stored
                    //for later processing (when the execution is restarted)
                    stoppedMsgs.add(msg);
                }
                stoppedMsgsLock.unlock();
            }

291 292 293 294 295
            if (
                    /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */

                    // Isto serve para re-direccionar as mensagens para o out of context
                    // enquanto a replica esta a receber o estado das outras e a actualizar-se
296

297
                    isRetrievingState ||
298

299
                    /******************************************************************/
300

301
                    consId > (lastConsId + 1)
302
            ) {
303 304

                Logger.println("(ExecutionManager.checkLimits) Message for execution "+consId+" is out of context, adding it to out of context set");
P
pjsousa@gmail.com 已提交
305 306 307 308 309 310
                //store it as an ahead of time message (out of context)
                addOutOfContextMessage(msg);
            } else {
                Logger.println("(ExecutionManager.checkLimits) message for execution "+consId+" can be processed");
                canProcessTheMessage = true;
            }
311 312
        } else if (

313 314
                /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
                // Is this replica revived?
315
                (currentConsId == -1 && lastConsId == -1 && consId >= (lastConsId + revivalHighMark)) ||
316 317
                /******************************************************************/

318
                (consId >= (lastConsId + paxosHighMark))
319
                ) { // Does this message exceeds the high mark?
320

321
            /**
P
pjsousa@gmail.com 已提交
322 323 324 325 326 327
            System.out.println("##################################################################################");
            System.out.println("- Ahead-of-time message (" + msg + ") discarded");
            System.out.println("- If many messages of the same consensus are discarded, the replica can halt!");
            System.out.println("- Try to increase the 'system.paxos.highMarc' configuration parameter.");
            System.out.println("- Last consensus executed: " + lastConsId);
            System.out.println("##################################################################################");
328
             /*/
P
pjsousa@gmail.com 已提交
329
            //TODO: at this point a new state should be recovered from other correct replicas
330 331

            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
332
            Logger.println("(ExecutionManager.checkLimits) Message for execution "+consId+" is beyond the paxos highmark, adding it to out of context set");
333
            addOutOfContextMessage(msg);
334 335
            tomLayer.requestState(this.reconfManager.getStaticConf().getProcessId(),
                    this.reconfManager.getCurrentViewOtherAcceptors(), msg.getSender(), consId);
336
            /******************************************************************/
P
pjsousa@gmail.com 已提交
337 338 339
        }
        outOfContextLock.unlock();

340
        //br.ufsc.das.util.Logger.println("(checkLimits) Mensagem recebida nao estah dentro dos limites");
P
pjsousa@gmail.com 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
        return canProcessTheMessage;
    }

    /**
     * Informs if there are messages till to be processed associated the specified consensus's execution
     * @param eid The ID for the consensus execution in question
     * @return True if there are still messages to be processed, false otherwise
     */
    public boolean thereArePendentMessages(int eid) {
        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/

        boolean result = outOfContextProposes.get(eid) != null || outOfContext.get(eid) != null;

        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();
357

P
pjsousa@gmail.com 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
        return result;
    }

    /**
     * Removes a consensus's execution from this manager
     * @param id ID of the consensus's execution to be removed
     * @return The consensus's execution that was removed
     */
    public Execution removeExecution(int id) {
        executionsLock.lock();
        /******* BEGIN EXECUTIONS CRITICAL SECTION *******/

        Execution execution = executions.remove(id);

        /******* END EXECUTIONS CRITICAL SECTION *******/
        executionsLock.unlock();
374

P
pjsousa@gmail.com 已提交
375 376 377 378 379 380 381 382 383 384 385
        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/

        outOfContextProposes.remove(id);
        outOfContext.remove(id);

        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();

        return execution;
    }
386
    /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
387
    public void removeOutOfContexts(int id) {
388 389 390 391

        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/

392 393 394 395 396 397 398 399 400
        Integer[] keys = new Integer[outOfContextProposes.keySet().size()];
        outOfContextProposes.keySet().toArray(keys);
        for (int i = 0; i < keys.length; i++)
                if (keys[i] <= id) outOfContextProposes.remove(keys[i]);

        keys = new Integer[outOfContext.keySet().size()];
        outOfContext.keySet().toArray(keys);
        for (int i = 0; i < keys.length; i++)
                if (keys[i] <= id) outOfContext.remove(keys[i]);
401 402 403 404 405

        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();
    }
    /********************************************************/
P
pjsousa@gmail.com 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422

    /**
     * Returns the specified consensus's execution
     *
     * @param eid ID of the consensus's execution to be returned
     * @return The consensus's execution specified
     */
    public Execution getExecution(int eid) {
        executionsLock.lock();
        /******* BEGIN EXECUTIONS CRITICAL SECTION *******/

        Execution execution = executions.get(eid);

        if (execution == null) {
            //there is no execution with the given eid

            //let's create one...
423
            execution = new Execution(this, new Consensus(eid, System.currentTimeMillis()),
P
pjsousa@gmail.com 已提交
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
                    initialTimeout);
            //...and add it to the executions table
            executions.put(eid, execution);

            /******* END EXECUTIONS CRITICAL SECTION *******/
            executionsLock.unlock();

            //now it is time to see if there are pending requests for this new
            //execution. First the propose...
            outOfContextLock.lock();
            /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/

            PaxosMessage prop = outOfContextProposes.remove(eid);
            if (prop != null) {
                Logger.println("(ExecutionManager.createExecution) (" + eid + ") A processar PROPOSE recebido previamente fora de contexto");
                acceptor.processMessage(prop);
            }

            //then we have to put the pending paxos messages
            List<PaxosMessage> messages = outOfContext.remove(eid);
            if (messages != null) {
                Logger.println("(createExecution) (" + eid + ") A processar " + messages.size() + " mensagens recebidas previamente fora de contexto");
                for (Iterator<PaxosMessage> i = messages.iterator(); i.hasNext();) {
                    acceptor.processMessage(i.next());
                    if (execution.isDecided()) {
                        Logger.println("(ExecutionManager.createExecution) execution " + eid + " decided.");
                        break;
                    }
                }
                Logger.println("(createExecution) (" + eid + ") Terminei processamento de mensagens recebidas previamente fora de contexto");
            }

            /******* END OUTOFCONTEXT CRITICAL SECTION *******/
            outOfContextLock.unlock();
        } else {
            /******* END EXECUTIONS CRITICAL SECTION *******/
            executionsLock.unlock();
        }
462

P
pjsousa@gmail.com 已提交
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500

        return execution;
    }


    /**
     * Stores a message established as being out of context (a message that
     * doesn't belong to current executing consensus).
     *
     * @param m Out of context message to be stored
     */
    private void addOutOfContextMessage(PaxosMessage m) {
        outOfContextLock.lock();
        /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/

        if (m.getPaxosType() == MessageFactory.PROPOSE) {
            outOfContextProposes.put(m.getNumber(), m);
        } else {
            List<PaxosMessage> messages = outOfContext.get(m.getNumber());
            if (messages == null) {
                messages = new LinkedList<PaxosMessage>();
                outOfContext.put(m.getNumber(), messages);
            }
            messages.add(m);

            if (outOfContext.size() % 1000 == 0) {
                Logger.println("(ExecutionManager.addOutOfContextMessage) out-of-context size: " + outOfContext.size());
            }
        }

        /******* END OUTOFCONTEXT CRITICAL SECTION *******/
        outOfContextLock.unlock();
    }

    public String toString() {
        return stoppedMsgs.toString();
    }
}