/** * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags * * This file is part of SMaRt. * * SMaRt is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * SMaRt is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along with SMaRt. If not, see . */ package navigators.smart.tom.core; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import navigators.smart.paxosatwar.Consensus; import navigators.smart.statemanagment.StateManager; import navigators.smart.statemanagment.TransferableState; import navigators.smart.tom.TOMRequestReceiver; import navigators.smart.tom.core.messages.TOMMessage; import navigators.smart.tom.util.BatchReader; import navigators.smart.tom.util.DebugInfo; import navigators.smart.tom.util.Logger; import navigators.smart.tom.util.TOMConfiguration; import navigators.smart.tom.util.TOMUtil; /** * This class implements a thread which will deliver totally ordered requests to the application * */ public class DeliveryThread extends Thread { private LinkedBlockingQueue decided = new LinkedBlockingQueue(); // decided consensus private TOMLayer tomLayer; // TOM layer private RequestRecover requestRecover; // TODO: isto ainda vai ser usado? private TOMRequestReceiver receiver; // Object that receives requests from clients private TOMConfiguration conf; /** * Creates a new instance of DeliveryThread * @param tomLayer TOM layer * @param receiver Object that receives requests from clients * @param conf TOM configuration */ public DeliveryThread(TOMLayer tomLayer, TOMRequestReceiver receiver, TOMConfiguration conf) { super("Delivery Thread"); this.tomLayer = tomLayer; this.receiver = receiver; this.conf = conf; this.requestRecover = new RequestRecover(tomLayer, conf); } /** * Invoked by the TOM layer, to deliver a decide consensus * @param cons Consensus established as being decided */ public void delivery(Consensus cons) { try { decided.put(cons); Logger.println("(DeliveryThread.delivery) Consensus " + cons.getId() + " finished. decided size=" + decided.size()); } catch (Exception e) { e.printStackTrace(System.out); } } /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ private ReentrantLock deliverLock = new ReentrantLock(); private Condition canDeliver = deliverLock.newCondition(); public void deliverLock() { deliverLock.lock(); Logger.println("Obti o deliver lock"); } public void deliverUnlock() { deliverLock.unlock(); Logger.println("Soltei o deliver lock"); } public void canDeliver() { canDeliver.signalAll(); } public void update(TransferableState state) { //deliverLock.lock(); System.out.println("Vou actualizar-me"); receiver.setState(state.getState()); tomLayer.lm.addLeaderInfo(state.getLastCheckpointEid(), state.getLastCheckpointRound(), state.getLastCheckpointLeader()); int lastCheckpointEid = state.getLastCheckpointEid(); int lastEid = state.getLastEid(); for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) { try { byte[] batch = state.getMessageBatch(eid).batch; // take a batch tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round, state.getMessageBatch(eid).leader); // obtain an array of requests from the taken consensus BatchReader batchReader = new BatchReader(batch, conf.getUseSignatures()==1); Logger.println("(DeliveryThread.update) interpreting and verifying batched requests."); int numberOfMessages = batchReader.getNumberOfMessages(); TOMMessage[] requests = new TOMMessage[numberOfMessages]; for (int i = 0; i < numberOfMessages; i++) { //read the message and its signature from the batch int messageSize = batchReader.getNextMessageSize(); byte[] message = new byte[messageSize]; batchReader.getNextMessage(message); byte[] signature = null; if (conf.getUseSignatures()==1){ signature = new byte[TOMUtil.getSignatureSize()]; batchReader.getNextSignature(signature); } try { DataInputStream ois = new DataInputStream(new ByteArrayInputStream(message)); TOMMessage tm = new TOMMessage(); tm.readExternal(ois); if (Logger.debug) tm.setSequence(new DebugInfo(eid, state.getMessageBatch(eid).round, state.getMessageBatch(eid).leader)); /******* Deixo isto comentado, pois nao me parece necessario ****/ /******* Alem disso, esta informacao nao vem no TransferableState **** tm.consensusStartTime = cons.startTime; tm.consensusExecutionTime = cons.executionTime; tm.consensusBatchSize = cons.batchSize; /*********************************************************************/ requests[i] = tm; //requests[i] = (TOMMessage) ois.readObject(); tomLayer.clientsManager.requestOrdered(requests[i]); } catch (Exception e) { e.printStackTrace(System.out); } } //obtain the nonce and timestamps to be delivered to the application long timestamp = batchReader.getTimestamp(); byte[] nonces = new byte[batchReader.getNumberOfNonces()]; if (nonces.length > 0) { batchReader.getNonces(nonces); } //deliver the request to the application (receiver) for (int i = 0; i < requests.length; i++) { requests[i].timestamp = timestamp; requests[i].nonces = nonces; /******* Deixo isto comentado, pois nao me parece necessario **********/ /******* Alem disso, esta informacao nao vem no TransferableState ********** requests[i].requestTotalLatency = System.currentTimeMillis()-cons.startTime; /***************************************************************************/ receiver.receiveOrderedMessage(requests[i]); } /****** Julgo que isto nao sera necessario *********** if (conf.getCheckpoint_period() > 0) { if ((eid > 0) && (eid % conf.getCheckpoint_period() == 0)) { Logger.println("(DeliveryThread.update) Performing checkpoint for consensus " + eid); byte[] state2 = receiver.getState(); tomLayer.saveState(state2, eid); //TODO: possivelmente fazer mais alguma coisa } else { Logger.println("(DeliveryThread.update) Storing message batch in the state log for consensus " + eid); tomLayer.saveBatch(batch, eid); //TODO: possivelmente fazer mais alguma coisa } } */ } catch (Exception e) { e.printStackTrace(System.out); } } //set this consensus as the last executed tomLayer.setLastExec(lastEid); //define the last stable consensus... the stable consensus can //be removed from the leaderManager and the executionManager if (lastEid > 2) { int stableConsensus = lastEid - 3; //tomLayer.lm.removeStableMultipleConsenusInfos(lastCheckpointEid, stableConsensus); tomLayer.execManager.removeOutOfContexts(stableConsensus); } //define that end of this execution //stateManager.setWaiting(-1); tomLayer.setNoExec(); decided.clear(); Logger.println("(DeliveryThread.update) All finished from " + lastCheckpointEid + " to " + lastEid); //verify if there is a next proposal to be executed //(it only happens if the previous consensus were decided in a //round > 0 /** Nao consigo perceber se isto tem utilidade neste contexto *****/ //int nextExecution = lastEid + 1; //if(tomLayer.acceptor.executeAcceptedPendent(nextExecution)) { //Logger.println("(DeliveryThread.update) Executed propose for " + nextExecution); //} /******************************************************************/ //canDeliver.signalAll(); //deliverLock.unlock(); } /********************************************************/ /** * This is the code for the thread. It delivers decided consensus to the TOM request receiver object (which is the application) */ @Override public void run() { long startTime; while (true) { /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ deliverLock(); //if (tomLayer != null) { while (tomLayer.isRetrievingState()) { canDeliver.awaitUninterruptibly(); } //} /******************************************************************/ try { //Consensus cons = decided.take(); // take a decided consensus /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ Logger.println("(DeliveryThread.run) Waiting for a consensus to be delivered."); Consensus cons = decided.poll(1500, TimeUnit.MILLISECONDS); // take a decided consensus if (cons == null) { Logger.println("(DeliveryThread.run) Timeout while waiting for a consensus, starting over."); deliverUnlock(); continue; } Logger.println("(DeliveryThread.run) A consensus was delivered."); /******************************************************************/ startTime = System.currentTimeMillis(); //TODO: avoid the case in which the received valid proposal is //different from the decided value // obtain an array of requests from the taken consensus BatchReader batchReader = new BatchReader(cons.getDecision(), conf.getUseSignatures()==1); TOMMessage[] requests = (TOMMessage[]) cons.getDeserializedDecision(); if (requests == null) { Logger.println("(DeliveryThread.run) interpreting and verifying batched requests."); int numberOfMessages = batchReader.getNumberOfMessages(); requests = new TOMMessage[numberOfMessages]; for (int i = 0; i < numberOfMessages; i++) { //read the message and its signature from the batch int messageSize = batchReader.getNextMessageSize(); byte[] message = new byte[messageSize]; batchReader.getNextMessage(message); byte[] signature = null; if (conf.getUseSignatures()==1){ signature = new byte[TOMUtil.getSignatureSize()]; batchReader.getNextSignature(signature); } try { DataInputStream ois = new DataInputStream(new ByteArrayInputStream(message)); TOMMessage tm = new TOMMessage(); tm.readExternal(ois); tm.consensusStartTime = cons.startTime; tm.consensusExecutionTime = cons.executionTime; tm.consensusBatchSize = cons.batchSize; /** ISTO E CODIGO DO JOAO, PARA TRATAR DE DEBUGGING */ if (Logger.debug) tm.setSequence(new DebugInfo(cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()))); /****************************************************/ requests[i] = tm; //requests[i] = (TOMMessage) ois.readObject(); tomLayer.clientsManager.requestOrdered(requests[i]); } catch (Exception e) { e.printStackTrace(System.out); } } } else { Logger.println("(DeliveryThread.run) using cached requests from the propose."); tomLayer.clientsManager.getClientsLock().lock(); for (int i = 0; i < requests.length; i++) { /** ISTO E CODIGO DO JOAO, PARA TRATAR DE DEBUGGING */ if (Logger.debug) requests[i].setSequence(new DebugInfo(cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()))); /****************************************************/ requests[i].consensusStartTime = cons.startTime; requests[i].consensusExecutionTime = cons.executionTime; requests[i].consensusBatchSize = cons.batchSize; tomLayer.clientsManager.requestOrdered(requests[i]); } tomLayer.clientsManager.getClientsLock().unlock(); batchReader.skipMessages(); } //set this consensus as the last executed tomLayer.setLastExec(cons.getId()); //define the last stable consensus... the stable consensus can //be removed from the leaderManager and the executionManager /**/ if (cons.getId() > 2) { int stableConsensus = cons.getId() - 3; System.out.println("Last stable consensus: " + stableConsensus); tomLayer.lm.removeStableConsenusInfos(stableConsensus); tomLayer.execManager.removeExecution(stableConsensus); } /**/ //define that end of this execution tomLayer.setInExec(-1); //verify if there is a next proposal to be executed //(it only happens if the previous consensus were decided in a //round > 0 int nextExecution = cons.getId() + 1; if(tomLayer.acceptor.executeAcceptedPendent(nextExecution)) { Logger.println("(DeliveryThread.run) Executed propose for " + nextExecution); } //obtain the nonce and timestamps to be delivered to the application long timestamp = batchReader.getTimestamp(); byte[] nonces = new byte[batchReader.getNumberOfNonces()]; if (nonces.length > 0) { batchReader.getNonces(nonces); } //deliver the request to the application (receiver) for (int i = 0; i < requests.length; i++) { requests[i].timestamp = timestamp; requests[i].nonces = nonces; requests[i].requestTotalLatency = System.currentTimeMillis()-cons.startTime; receiver.receiveOrderedMessage(requests[i]); } /** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */ System.out.println("[DeliveryThread.run]"); System.out.println("Acabei de entregar o batch do EID " + cons.getId()); System.out.println("[/DeliveryThread.run]"); if (conf.getCheckpoint_period() > 0) { if ((cons.getId() > 0) && ((cons.getId() % conf.getCheckpoint_period()) == 0)) { Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId()); byte[] state = receiver.getState(); tomLayer.saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())); //TODO: possivelmente fazer mais alguma coisa } else { Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId()); tomLayer.saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())); //TODO: possivelmente fazer mais alguma coisa } } /********************************************************/ Logger.println("(DeliveryThread.run) All finished for " + cons.getId() + ", took " + (System.currentTimeMillis() - startTime)); } catch (Exception e) { e.printStackTrace(System.out); } /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */ deliverUnlock(); /******************************************************************/ } } }