DeliveryThread.java 20.5 KB
Newer Older
P
pjsousa@gmail.com 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/**
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
 * 
 * This file is part of SMaRt.
 * 
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */
package navigators.smart.tom.core;

import java.util.concurrent.LinkedBlockingQueue;

22
import java.util.concurrent.TimeUnit;
23 24
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
P
pjsousa@gmail.com 已提交
25
import navigators.smart.paxosatwar.Consensus;
26
import navigators.smart.reconfiguration.ReconfigurationManager;
27
import navigators.smart.statemanagment.TransferableState;
P
pjsousa@gmail.com 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
import navigators.smart.tom.TOMRequestReceiver;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.BatchReader;
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
public class DeliveryThread extends Thread {

    private LinkedBlockingQueue<Consensus> decided = new LinkedBlockingQueue<Consensus>(); // decided consensus
    private TOMLayer tomLayer; // TOM layer
    private RequestRecover requestRecover; // TODO: isto ainda vai ser usado?
    private TOMRequestReceiver receiver; // Object that receives requests from clients
44
    private ReconfigurationManager manager;
P
pjsousa@gmail.com 已提交
45 46 47 48 49 50 51

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
     * @param conf TOM configuration
     */
52
    public DeliveryThread(TOMLayer tomLayer, TOMRequestReceiver receiver, ReconfigurationManager manager) {
P
pjsousa@gmail.com 已提交
53 54 55 56
        super("Delivery Thread");

        this.tomLayer = tomLayer;
        this.receiver = receiver;
57 58 59 60
        //******* EDUARDO BEGIN **************//
        this.manager = manager;
        this.requestRecover = new RequestRecover(tomLayer, manager);
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
61 62 63 64 65 66 67 68
    }

    /**
     * Invoked by the TOM layer, to deliver a decide consensus
     * @param cons Consensus established as being decided
     */
    public void delivery(Consensus cons) {
        try {
69
            //System.out.println("Consenso decidido! "+cons.getId());
P
pjsousa@gmail.com 已提交
70 71 72 73 74 75
            decided.put(cons);
            Logger.println("(DeliveryThread.delivery) Consensus " + cons.getId() + " finished. decided size=" + decided.size());
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
76
    /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
77 78 79 80 81
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
        deliverLock.lock();
82
        Logger.println("(DeliveryThread.deliverLock) Deliver lock obtained");
83
    }
84

85 86
    public void deliverUnlock() {
        deliverLock.unlock();
87
        Logger.println("(DeliveryThread.deliverUnlock) Deliver Released");
88 89 90 91 92
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
93

94 95
    public void update(TransferableState state) {

96
        //deliverLock.lock();
97

98 99
        int lastCheckpointEid = state.getLastCheckpointEid();
        int lastEid = state.getLastEid();
100

101
        Logger.println("(DeliveryThread.update) I'm going to update myself from EID " + lastCheckpointEid + " to EID " + lastEid);
102

103
        receiver.setState(state.getState());
104

105
        tomLayer.lm.addLeaderInfo(lastCheckpointEid, state.getLastCheckpointRound(), state.getLastCheckpointLeader());
106

107
        for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
108 109 110

            try {

111
                byte[] batch = state.getMessageBatch(eid).batch; // take a batch
112

113 114
                //System.out.println("(TESTE // DeliveryThread.update) EID: " + eid + ", round: " + state.getMessageBatch(eid).round + ", value: " + batch.length);

115 116
                tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round, state.getMessageBatch(eid).leader);

117
                // obtain an array of requests from the taken consensus
118 119
                BatchReader batchReader = new BatchReader(batch,
                        manager.getStaticConf().getUseSignatures() == 1);
120

121
                Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
122

123 124 125 126 127 128 129 130
                TOMMessage[] requests = batchReader.deserialiseRequests(manager);

                tomLayer.clientsManager.getClientsLock().lock();
                for (int i = 0; i < requests.length; i++) {

                    tomLayer.clientsManager.requestOrdered(requests[i]);

                }
131 132

                //deliver the request to the application (receiver)
133 134
                tomLayer.clientsManager.getClientsLock().unlock();

135 136 137 138 139 140
                for (int i = 0; i < requests.length; i++) {

                    /******* Deixo isto comentado, pois nao me parece necessario      **********/
                    /******* Alem disso, esta informacao nao vem no TransferableState **********
                    requests[i].requestTotalLatency = System.currentTimeMillis()-cons.startTime;
                    /***************************************************************************/
141 142 143 144 145 146 147 148 149 150 151
                    
                    //receiver.receiveOrderedMessage(requests[i]);
                    
                    //******* EDUARDO BEGIN: Acho que precisa mudar aqui, como na entrega normal **************//
                     //TODO: verificar se aqui precisa mudar a enterga para as vistas
                     if (requests[i].getViewID() == this.manager.getCurrentViewId()) {
                        if (requests[i].getReqType() == ReconfigurationManager.TOM_NORMAL_REQUEST) {
                            receiver.receiveOrderedMessage(requests[i]);
                        } else {
                            //Reconfiguration request processing!
                            this.manager.enqueueUpdate(requests[i]);
152

153 154 155
                        }
                    } else {
                        this.tomLayer.getCommunication().send(new int[]{requests[i].getSender()},
156 157
                                new TOMMessage(this.manager.getStaticConf().getProcessId(),
                                requests[i].getSession(), requests[i].getSequence(),
158
                                TOMUtil.getBytes(this.manager.getCurrentView()), this.manager.getCurrentViewId()));
159
                    }
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
                    
                    //******* EDUARDO END: Acho que precisa mudar aqui, como na entrega normal **************//
                }

                // isto serve para retirar pedidos que nao cheguem a ser processados pela replica,
                // uma vez que se saltaram varias execucoes de consenso para a frente
                tomLayer.clientsManager.removeRequests(requests);

                //******* EDUARDO BEGIN: Acho que precisa mudar aqui, como na entrega normal **************//
                //TODO: verificar se aqui precisa mudar a enterga para as vistas
                if (this.manager.hasUpdates()) {
                    //System.out.println("Entrou aqui 1");
                    
                    //byte[] response = this.manager.executeUpdates(eid,state.getMessageBatch(eid).round,this.receiver.getState());
                    byte[] blabla = {1,2,3};
                    byte[] response = this.manager.executeUpdates(eid,state.getMessageBatch(eid).round,blabla);
                    TOMMessage[] dests = this.manager.clearUpdates();

                    for (int i = 0; i < dests.length; i++) {
                        //System.out.println("Entrou aqui 2");
                        this.tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
181 182
                                new TOMMessage(this.manager.getStaticConf().getProcessId(), 
                                dests[i].getSession(), dests[i].getSequence(),
183 184
                                response, this.manager.getCurrentViewId()));

185
                    }
186 187 188
                    
                    this.tomLayer.getCommunication().updateServersConnections();
                    
189
                }
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
                //******* EDUARDO END: Acho que precisa mudar aqui, como na entrega normal **************//


            /****** Julgo que isto nao sera necessario ***********
            if (conf.getCheckpoint_period() > 0) {
            if ((eid > 0) && (eid % conf.getCheckpoint_period() == 0)) {
            Logger.println("(DeliveryThread.update) Performing checkpoint for consensus " + eid);
            byte[] state2 = receiver.getState();
            tomLayer.saveState(state2, eid);
            //TODO: possivelmente fazer mais alguma coisa
            }
            else {
            Logger.println("(DeliveryThread.update) Storing message batch in the state log for consensus " + eid);
            tomLayer.saveBatch(batch, eid);
            //TODO: possivelmente fazer mais alguma coisa
            }
            }
             */
208
            } catch (Exception e) {
209
                e.printStackTrace(System.out);
210 211 212 213
            }

        }

214 215 216 217 218 219
        //set this consensus as the last executed
        tomLayer.setLastExec(lastEid);

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
        if (lastEid > 2) {
220
            int stableConsensus = lastEid - 3;
221 222

            //tomLayer.lm.removeStableMultipleConsenusInfos(lastCheckpointEid, stableConsensus);
223
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
224 225 226
        }

        //define that end of this execution
227
        //stateManager.setWaiting(-1);
228
        tomLayer.setNoExec();
229

230
        decided.clear();
231 232

        Logger.println("(DeliveryThread.update) All finished from " + lastCheckpointEid + " to " + lastEid);
233 234 235 236 237 238 239 240 241 242 243 244
    //verify if there is a next proposal to be executed
    //(it only happens if the previous consensus were decided in a
    //round > 0
    /** Nao consigo perceber se isto tem utilidade neste contexto *****/
    //int nextExecution = lastEid + 1;
    //if(tomLayer.acceptor.executeAcceptedPendent(nextExecution)) {
    //Logger.println("(DeliveryThread.update) Executed propose for " + nextExecution);
    //}
    /******************************************************************/

    //canDeliver.signalAll();
    //deliverLock.unlock();
245 246
    }

247
    /********************************************************/
P
pjsousa@gmail.com 已提交
248 249 250 251 252
    /**
     * This is the code for the thread. It delivers decided consensus to the TOM request receiver object (which is the application)
     */
    @Override
    public void run() {
253

P
pjsousa@gmail.com 已提交
254 255
        long startTime;
        while (true) {
256 257

            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
258
            deliverLock();
259 260

            //if (tomLayer != null) {
261 262 263
            while (tomLayer.isRetrievingState()) {
                canDeliver.awaitUninterruptibly();
            }
264 265
            //}
            /******************************************************************/
P
pjsousa@gmail.com 已提交
266
            try {
267

268 269 270 271
                //Consensus cons = decided.take(); // take a decided consensus
                /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
                Logger.println("(DeliveryThread.run) Waiting for a consensus to be delivered.");
                Consensus cons = decided.poll(1500, TimeUnit.MILLISECONDS); // take a decided consensus
272
                
273 274 275 276 277 278 279
                if (cons == null) {
                    Logger.println("(DeliveryThread.run) Timeout while waiting for a consensus, starting over.");
                    deliverUnlock();
                    continue;
                }
                Logger.println("(DeliveryThread.run) A consensus was delivered.");
                /******************************************************************/
P
pjsousa@gmail.com 已提交
280
                startTime = System.currentTimeMillis();
281 282 283
                
                
                //System.out.println("vai entragar o consenso: "+cons.getId());
P
pjsousa@gmail.com 已提交
284 285 286 287

                //TODO: avoid the case in which the received valid proposal is
                //different from the decided value

288 289 290 291 292 293
                

                //System.out.println("chegou aqui 1: "+cons.getId());

                //System.out.println("(TESTE // DeliveryThread.run) EID: " + cons.getId() + ", round: " + cons.getDecisionRound() + ", value: " + cons.getDecision().length);

P
pjsousa@gmail.com 已提交
294 295
                TOMMessage[] requests = (TOMMessage[]) cons.getDeserializedDecision();

296 297
                 //System.out.println("chegou aqui 2: "+cons.getId());
                
P
pjsousa@gmail.com 已提交
298
                if (requests == null) {
299 300 301
                    
                     //System.out.println("chegou aqui 3 a: "+cons.getId());
                    
P
pjsousa@gmail.com 已提交
302 303
                    Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");

304 305 306 307
                    // obtain an array of requests from the taken consensus
                    BatchReader batchReader = new BatchReader(cons.getDecision(),
                            manager.getStaticConf().getUseSignatures() == 1);
                    requests = batchReader.deserialiseRequests(manager);
P
pjsousa@gmail.com 已提交
308

309
                } else {
310 311
                    //System.out.println("chegou aqui 3 b: "+cons.getId());
                    if (Logger.debug) {
312
                        Logger.println("(DeliveryThread.run) using cached requests from the propose.");
313
                    }
P
pjsousa@gmail.com 已提交
314

315 316 317 318 319 320 321 322
                }
                
               
                 //System.out.println("chegou aqui 4: "+cons.getId());
                
                tomLayer.clientsManager.getClientsLock().lock();
                 //System.out.println("chegou aqui 5: "+cons.getId());
                for (int i = 0; i < requests.length; i++) {
323

324
                    /** ISTO E CODIGO DO JOAO, PARA TRATAR DE DEBUGGING */
325 326
//                    if (Logger.debug)
//                        requests[i].setSequence(new DebugInfo(cons.getId(), cons.getDecisionRound(), lm.getLeader(cons.getId(), cons.getDecisionRound())));
327 328 329 330 331 332 333 334 335 336
                    /****************************************************/
                    requests[i].consensusStartTime = cons.startTime;
                    requests[i].consensusExecutionTime = cons.executionTime;
                    requests[i].consensusBatchSize = cons.batchSize;
                    //System.out.println("chegou aqui 6: "+cons.getId());
                    tomLayer.clientsManager.requestOrdered(requests[i]);
                   //System.out.println("chegou aqui 7: "+cons.getId()); 
                }
                tomLayer.clientsManager.getClientsLock().unlock();
                //System.out.println("chegou aqui 8: "+cons.getId());
P
pjsousa@gmail.com 已提交
337 338 339 340 341 342

                //set this consensus as the last executed
                tomLayer.setLastExec(cons.getId());

                //define the last stable consensus... the stable consensus can
                //be removed from the leaderManager and the executionManager
343
                /**/
P
pjsousa@gmail.com 已提交
344 345 346 347 348 349
                if (cons.getId() > 2) {
                    int stableConsensus = cons.getId() - 3;

                    tomLayer.lm.removeStableConsenusInfos(stableConsensus);
                    tomLayer.execManager.removeExecution(stableConsensus);
                }
350
                /**/
351
               
P
pjsousa@gmail.com 已提交
352 353 354 355 356

                //verify if there is a next proposal to be executed
                //(it only happens if the previous consensus were decided in a
                //round > 0
                int nextExecution = cons.getId() + 1;
357
                if (tomLayer.acceptor.executeAcceptedPendent(nextExecution)) {
P
pjsousa@gmail.com 已提交
358 359 360 361 362
                    Logger.println("(DeliveryThread.run) Executed propose for " + nextExecution);
                }

                //deliver the request to the application (receiver)
                for (int i = 0; i < requests.length; i++) {
363 364 365 366 367 368 369 370 371 372 373 374 375
                    requests[i].requestTotalLatency = System.currentTimeMillis() - cons.startTime;

                    //******* EDUARDO BEGIN **************//
                    if (requests[i].getViewID() == this.manager.getCurrentViewId()) {
                        if (requests[i].getReqType() == ReconfigurationManager.TOM_NORMAL_REQUEST) {
                            receiver.receiveOrderedMessage(requests[i]);
                        } else {
                            //Reconfiguration request processing!
                            this.manager.enqueueUpdate(requests[i]);

                        }
                    } else {
                        this.tomLayer.getCommunication().send(new int[]{requests[i].getSender()},
376 377
                                new TOMMessage(this.manager.getStaticConf().getProcessId(),
                                requests[i].getSession(), requests[i].getSequence(),
378 379 380 381
                                TOMUtil.getBytes(this.manager.getCurrentView()), this.manager.getCurrentViewId()));
                    }
                    
                        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
382 383
                }

384

385 386 387 388 389 390 391 392 393 394 395 396 397 398
                //******* EDUARDO BEGIN **************//
                if (this.manager.hasUpdates()) {
                    //System.out.println("Entrou aqui 1");
                    
                    
                    
                    //byte[] response = this.manager.executeUpdates(cons.getId(),cons.getDecisionRound().getNumber(),this.receiver.getState());
                    byte[] blabla = {1,2,3};
                    byte[] response = this.manager.executeUpdates(cons.getId(), cons.getDecisionRound().getNumber(),blabla);
                    TOMMessage[] dests = this.manager.clearUpdates();

                    for (int i = 0; i < dests.length; i++) {
                        //System.out.println("Entrou aqui 2");
                        this.tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
399 400
                                new TOMMessage(this.manager.getStaticConf().getProcessId(), dests[i].getSession(),
                                dests[i].getSequence(), response, this.manager.getCurrentViewId()));
401 402 403 404 405 406 407 408 409 410

                    }
                    
                    this.tomLayer.getCommunication().updateServersConnections();
                    
                }
                //******* EDUARDO END **************//
                
                 //define that end of this execution
                tomLayer.setInExec(-1);
411
                tomLayer.processOutOfContext();
412 413

                /** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
414 415
                Logger.println("(DeliveryThread.run) I just delivered the batch of EID " + cons.getId());

416
                if (manager.getStaticConf().isStateTransferEnabled()) {
417
                    Logger.println("(DeliveryThread.run) The state transfer protocol is enabled");
418 419
                    if (manager.getStaticConf().getCheckpoint_period() > 0) {
                        if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpoint_period()) == 0)) {
420 421
                            Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
                            byte[] state = receiver.getState();
422 423 424
                            tomLayer.saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
                        //TODO: possivelmente fazer mais alguma coisa
                        } else {
425
                            Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
426 427
                            tomLayer.saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
                        //TODO: possivelmente fazer mais alguma coisa
428
                        }
P
pjsousa@gmail.com 已提交
429 430 431 432 433
                    }
                }
                /********************************************************/
                Logger.println("(DeliveryThread.run) All finished for " + cons.getId() + ", took " + (System.currentTimeMillis() - startTime));
            } catch (Exception e) {
434
                e.printStackTrace(System.out);
P
pjsousa@gmail.com 已提交
435
            }
436
            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
437
            deliverUnlock();
438
        /******************************************************************/
P
pjsousa@gmail.com 已提交
439 440 441
        }
    }
}