/** * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags * * This file is part of SMaRt. * * SMaRt is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * SMaRt is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along with SMaRt. If not, see . */ package navigators.smart.paxosatwar.executionmanager; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.TreeMap; import java.util.concurrent.locks.ReentrantLock; import navigators.smart.paxosatwar.Consensus; import navigators.smart.paxosatwar.messages.MessageFactory; import navigators.smart.paxosatwar.messages.PaxosMessage; import navigators.smart.paxosatwar.roles.Acceptor; import navigators.smart.paxosatwar.roles.Proposer; import navigators.smart.reconfiguration.ReconfigurationManager; import navigators.smart.tom.core.TOMLayer; import navigators.smart.tom.util.Logger; /** * This classe manages consensus instances. Each execution is a consensus * instance. It can have several rounds if there were problems during consensus. * * @author Alysson */ public final class ExecutionManager { private ReconfigurationManager reconfManager; private Acceptor acceptor; // Acceptor role of the PaW algorithm private Proposer proposer; // Proposer role of the PaW algorithm //******* EDUARDO BEGIN: agora estas variaveis estao todas concentradas na Reconfigurationmanager **************// //private int me; // This process ID //private int[] acceptors; // Process ID's of all replicas, including this one //private int[] otherAcceptors; // Process ID's of all replicas, except this one //******* EDUARDO END **************// private Map executions = new TreeMap(); // Executions private ReentrantLock executionsLock = new ReentrantLock(); //lock for executions table // Paxos messages that were out of context (that didn't belong to the execution that was/is is progress private Map> outOfContext = new HashMap>(); // Proposes that were out of context (that belonged to future executions, and not the one running at the time) private Map outOfContextProposes = new HashMap(); private ReentrantLock outOfContextLock = new ReentrantLock(); //lock for out of context private boolean stopped = false; // Is the execution manager stopped? // When the execution manager is stopped, incoming paxos messages are stored here private Queue stoppedMsgs = new LinkedList(); private Round stoppedRound = null; // round at which the current execution was stoppped private ReentrantLock stoppedMsgsLock = new ReentrantLock(); //lock for stopped messages private TOMLayer tomLayer; // TOM layer associated with this execution manager private long initialTimeout; // initial timeout for rounds private int paxosHighMark; // Paxos high mark for consensus instances /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ private int revivalHighMark; // Paxos high mark for consensus instances when this replica EID equals 0 /******************************************************************/ /** * Creates a new instance of ExecutionManager * * @param acceptor Acceptor role of the PaW algorithm * @param proposer Proposer role of the PaW algorithm * @param acceptors Process ID's of all replicas, including this one * @param f Maximum number of replicas that can be faulty * @param me This process ID * @param initialTimeout initial timeout for rounds */ public ExecutionManager(ReconfigurationManager manager, Acceptor acceptor, Proposer proposer, int me, long initialTimeout) { //******* EDUARDO BEGIN **************// this.reconfManager = manager; this.acceptor = acceptor; this.proposer = proposer; //this.acceptors = manager.getCurrentViewProcesses(); //this.me = me; this.initialTimeout = initialTimeout; this.paxosHighMark = reconfManager.getStaticConf().getPaxosHighMark(); /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ this.revivalHighMark = reconfManager.getStaticConf().getRevivalHighMark(); /******************************************************************/ //******* EDUARDO END **************// } /** * Sets the TOM layer associated with this execution manager * @param tom The TOM layer associated with this execution manager */ public void setTOMLayer(TOMLayer tom) { this.tomLayer = tom; } /** * Returns the TOM layer associated with this execution manager * @return The TOM layer associated with this execution manager */ public TOMLayer getTOMLayer() { return tomLayer; } //******* EDUARDO BEGIN: isso tudo deve ser obtido a partir do ReconfigurationManager **************// /** * Returns this process ID * @return This process ID */ /* public int getProcessId() { return me; }*/ /** * Returns the process ID's of all replicas, including this one * @return Array of the process ID's of all replicas, including this one */ /* public int[] getAcceptors() { return acceptors; }*/ /** * Returns the process ID's of all replicas, except this one * @return Array of the process ID's of all replicas, except this one */ /*public int[] getOtherAcceptors() { if (otherAcceptors == null) { otherAcceptors = new int[acceptors.length - 1]; int c = 0; for (int i = 0; i < acceptors.length; i++) { if (acceptors[i] != me) { otherAcceptors[c++] = acceptors[i]; } } } return otherAcceptors; }*/ //******* EDUARDO END **************// /** * Returns the acceptor role of the PaW algorithm * @return The acceptor role of the PaW algorithm */ public Acceptor getAcceptor() { return acceptor; } public Proposer getProposer() { return proposer; } /** * Stops this execution manager */ public void stop() { Logger.println("(ExecutionManager.stoping) Stoping execution manager"); stoppedMsgsLock.lock(); this.stopped = true; if (tomLayer.getInExec() != -1) { stoppedRound = getExecution(tomLayer.getInExec()).getLastRound(); //stoppedRound.getTimeoutTask().cancel(); Logger.println("(ExecutionManager.stop) Stoping round " + stoppedRound.getNumber() + " of consensus " + stoppedRound.getExecution().getId()); } stoppedMsgsLock.unlock(); } /** * Restarts this execution manager */ public void restart() { Logger.println("(ExecutionManager.restart) Starting execution manager"); stoppedMsgsLock.lock(); this.stopped = false; // We don't want to use timeouts inside paxos anymore /*if (stoppedRound != null) { acceptor.scheduleTimeout(stoppedRound); stoppedRound = null; }*/ //process stopped messages while(!stoppedMsgs.isEmpty()){ acceptor.processMessage(stoppedMsgs.remove()); } stoppedMsgsLock.unlock(); Logger.println("(ExecutionManager.restart) Finished stopped messages processing"); } /** * Checks if this message can execute now. If it is not possible, * it is stored in outOfContextMessages * * @param msg the received message * @return true in case the message can be executed, false otherwise */ public final boolean checkLimits(PaxosMessage msg) { outOfContextLock.lock(); int consId = msg.getNumber(); int lastConsId = tomLayer.getLastExec(); /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ int currentConsId = tomLayer.getInExec(); /******************************************************************/ int msgType = msg.getPaxosType(); boolean isRetrievingState = tomLayer.isRetrievingState(); String type = null; switch (msgType) { case MessageFactory.PROPOSE: type = "PROPOSE"; break; case MessageFactory.WEAK: type = "WEAK"; break; case MessageFactory.STRONG: type = "STRONG"; break; case MessageFactory.DECIDE: type = "DECIDE"; break; case MessageFactory.FREEZE: type = "FREEZE"; break; case MessageFactory.COLLECT: type = "COLLECT"; break; default: type = ""; break; } if (isRetrievingState) Logger.println("(ExecutionManager.checkLimits) I'm waiting for a state"); Logger.println("(ExecutionManager.checkLimits) I received a message from replica "+ msg.getSender() + " for execution " + consId + " of type " + type); Logger.println("(ExecutionManager.checkLimits) I'm at execution " + currentConsId); Logger.println("(ExecutionManager.checkLimits) My last las execution is " + lastConsId); boolean canProcessTheMessage = false; if ( /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ // Isto serve para re-direccionar as mensagens para o out of context // enquanto a replica esta a receber o estado das outras e a actualizar-se isRetrievingState || // Is this replica retrieving a state? // Is this not a revived replica? (!(/*currentConsId == -1 &&*/ lastConsId == -1 && consId >= (lastConsId + revivalHighMark)) && /******************************************************************/ (consId > lastConsId && (consId < (lastConsId + paxosHighMark)))) ) { // Is this message within the low and high marks (or maybe is the replica synchronizing) ? if(stopped) {//just an optimization to avoid calling the lock in normal case stoppedMsgsLock.lock(); if (stopped) { Logger.println("(ExecutionManager.checkLimits) adding message for execution "+consId+" to stoopped"); //the execution manager was stopped, the messages should be stored //for later processing (when the execution is restarted) stoppedMsgs.add(msg); } stoppedMsgsLock.unlock(); } if ( /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ // Isto serve para re-direccionar as mensagens para o out of context // enquanto a replica esta a receber o estado das outras e a actualizar-se isRetrievingState || /******************************************************************/ consId > (lastConsId + 1) ) { Logger.println("(ExecutionManager.checkLimits) Message for execution "+consId+" is out of context, adding it to out of context set"); //store it as an ahead of time message (out of context) addOutOfContextMessage(msg); } else { Logger.println("(ExecutionManager.checkLimits) message for execution "+consId+" can be processed"); canProcessTheMessage = true; } } else if ( /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ // Is this replica revived? (/*currentConsId == -1 &&*/ lastConsId == -1 && consId >= (lastConsId + revivalHighMark)) || /******************************************************************/ (consId >= (lastConsId + paxosHighMark)) ) { // Does this message exceeds the high mark? /** System.out.println("##################################################################################"); System.out.println("- Ahead-of-time message (" + msg + ") discarded"); System.out.println("- If many messages of the same consensus are discarded, the replica can halt!"); System.out.println("- Try to increase the 'system.paxos.highMarc' configuration parameter."); System.out.println("- Last consensus executed: " + lastConsId); System.out.println("##################################################################################"); /*/ //TODO: at this point a new state should be recovered from other correct replicas /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ Logger.println("(ExecutionManager.checkLimits) Message for execution "+consId+" is beyond the paxos highmark, adding it to out of context set"); addOutOfContextMessage(msg); tomLayer.requestState(this.reconfManager.getStaticConf().getProcessId(), this.reconfManager.getCurrentViewOtherAcceptors(), msg.getSender(), consId); /******************************************************************/ } outOfContextLock.unlock(); //br.ufsc.das.util.Logger.println("(checkLimits) Mensagem recebida nao estah dentro dos limites"); return canProcessTheMessage; } /** * Informs if there are messages till to be processed associated the specified consensus's execution * @param eid The ID for the consensus execution in question * @return True if there are still messages to be processed, false otherwise */ public boolean thereArePendentMessages(int eid) { outOfContextLock.lock(); /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ boolean result = outOfContextProposes.get(eid) != null || outOfContext.get(eid) != null; /******* END OUTOFCONTEXT CRITICAL SECTION *******/ outOfContextLock.unlock(); return result; } /** * Removes a consensus's execution from this manager * @param id ID of the consensus's execution to be removed * @return The consensus's execution that was removed */ public Execution removeExecution(int id) { executionsLock.lock(); /******* BEGIN EXECUTIONS CRITICAL SECTION *******/ Execution execution = executions.remove(id); /******* END EXECUTIONS CRITICAL SECTION *******/ executionsLock.unlock(); outOfContextLock.lock(); /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ outOfContextProposes.remove(id); outOfContext.remove(id); /******* END OUTOFCONTEXT CRITICAL SECTION *******/ outOfContextLock.unlock(); return execution; } /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ public void removeOutOfContexts(int id) { outOfContextLock.lock(); /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ Integer[] keys = new Integer[outOfContextProposes.keySet().size()]; outOfContextProposes.keySet().toArray(keys); for (int i = 0; i < keys.length; i++) if (keys[i] <= id) outOfContextProposes.remove(keys[i]); keys = new Integer[outOfContext.keySet().size()]; outOfContext.keySet().toArray(keys); for (int i = 0; i < keys.length; i++) if (keys[i] <= id) outOfContext.remove(keys[i]); /******* END OUTOFCONTEXT CRITICAL SECTION *******/ outOfContextLock.unlock(); } /********************************************************/ /** * Returns the specified consensus's execution * * @param eid ID of the consensus's execution to be returned * @return The consensus's execution specified */ public Execution getExecution(int eid) { executionsLock.lock(); /******* BEGIN EXECUTIONS CRITICAL SECTION *******/ Execution execution = executions.get(eid); if (execution == null) { //there is no execution with the given eid //let's create one... execution = new Execution(this, new Consensus(eid, System.currentTimeMillis()), initialTimeout); //...and add it to the executions table executions.put(eid, execution); /******* END EXECUTIONS CRITICAL SECTION *******/ executionsLock.unlock(); //now it is time to see if there are pending requests for this new //execution. First the propose... outOfContextLock.lock(); /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ PaxosMessage prop = outOfContextProposes.remove(eid); if (prop != null) { Logger.println("(ExecutionManager.createExecution) (" + eid + ") A processar PROPOSE recebido previamente fora de contexto"); acceptor.processMessage(prop); } //then we have to put the pending paxos messages List messages = outOfContext.remove(eid); if (messages != null) { Logger.println("(createExecution) (" + eid + ") A processar " + messages.size() + " mensagens recebidas previamente fora de contexto"); for (Iterator i = messages.iterator(); i.hasNext();) { acceptor.processMessage(i.next()); if (execution.isDecided()) { Logger.println("(ExecutionManager.createExecution) execution " + eid + " decided."); break; } } Logger.println("(createExecution) (" + eid + ") Terminei processamento de mensagens recebidas previamente fora de contexto"); } /******* END OUTOFCONTEXT CRITICAL SECTION *******/ outOfContextLock.unlock(); } else { /******* END EXECUTIONS CRITICAL SECTION *******/ executionsLock.unlock(); } return execution; } /** * Stores a message established as being out of context (a message that * doesn't belong to current executing consensus). * * @param m Out of context message to be stored */ private void addOutOfContextMessage(PaxosMessage m) { outOfContextLock.lock(); /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ if (m.getPaxosType() == MessageFactory.PROPOSE) { outOfContextProposes.put(m.getNumber(), m); } else { List messages = outOfContext.get(m.getNumber()); if (messages == null) { messages = new LinkedList(); outOfContext.put(m.getNumber(), messages); } messages.add(m); if (outOfContext.size() % 1000 == 0) { Logger.println("(ExecutionManager.addOutOfContextMessage) out-of-context size: " + outOfContext.size()); } } /******* END OUTOFCONTEXT CRITICAL SECTION *******/ outOfContextLock.unlock(); } public String toString() { return stoppedMsgs.toString(); } }