DeliveryThread.java 20.6 KB
Newer Older
P
pjsousa@gmail.com 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/**
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
 * 
 * This file is part of SMaRt.
 * 
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */
package navigators.smart.tom.core;

import java.util.concurrent.LinkedBlockingQueue;

22
import java.util.concurrent.TimeUnit;
23 24
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
P
pjsousa@gmail.com 已提交
25
import navigators.smart.paxosatwar.Consensus;
26
import navigators.smart.reconfiguration.ReconfigurationManager;
27
import navigators.smart.statemanagment.TransferableState;
P
pjsousa@gmail.com 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
import navigators.smart.tom.TOMRequestReceiver;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.BatchReader;
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
public class DeliveryThread extends Thread {

    private LinkedBlockingQueue<Consensus> decided = new LinkedBlockingQueue<Consensus>(); // decided consensus
    private TOMLayer tomLayer; // TOM layer
    private RequestRecover requestRecover; // TODO: isto ainda vai ser usado?
    private TOMRequestReceiver receiver; // Object that receives requests from clients
44
    private ReconfigurationManager manager;
P
pjsousa@gmail.com 已提交
45 46 47 48 49 50 51

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
     * @param conf TOM configuration
     */
52
    public DeliveryThread(TOMLayer tomLayer, TOMRequestReceiver receiver, ReconfigurationManager manager) {
P
pjsousa@gmail.com 已提交
53 54 55 56
        super("Delivery Thread");

        this.tomLayer = tomLayer;
        this.receiver = receiver;
57 58 59 60
        //******* EDUARDO BEGIN **************//
        this.manager = manager;
        this.requestRecover = new RequestRecover(tomLayer, manager);
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
61 62 63 64 65 66 67 68
    }

    /**
     * Invoked by the TOM layer, to deliver a decide consensus
     * @param cons Consensus established as being decided
     */
    public void delivery(Consensus cons) {
        try {
69
            //System.out.println("Consenso decidido! "+cons.getId());
P
pjsousa@gmail.com 已提交
70 71 72 73 74 75
            decided.put(cons);
            Logger.println("(DeliveryThread.delivery) Consensus " + cons.getId() + " finished. decided size=" + decided.size());
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
76
    /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
77 78 79 80 81
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
        deliverLock.lock();
B
bessani@gmail.com 已提交
82
        //Logger.println("(DeliveryThread.deliverLock) Deliver lock obtained");
83
    }
84

85 86
    public void deliverUnlock() {
        deliverLock.unlock();
B
bessani@gmail.com 已提交
87
        //Logger.println("(DeliveryThread.deliverUnlock) Deliver Released");
88 89 90 91 92
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
93

94 95
    public void update(TransferableState state) {

96
        //deliverLock.lock();
97

98 99
        int lastCheckpointEid = state.getLastCheckpointEid();
        int lastEid = state.getLastEid();
100

101
        Logger.println("(DeliveryThread.update) I'm going to update myself from EID " + lastCheckpointEid + " to EID " + lastEid);
102

103
        receiver.setState(state.getState());
104

105
        tomLayer.lm.addLeaderInfo(lastCheckpointEid, state.getLastCheckpointRound(), state.getLastCheckpointLeader());
106

107
        for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
108 109 110

            try {

111
                byte[] batch = state.getMessageBatch(eid).batch; // take a batch
112

113 114
                //System.out.println("(TESTE // DeliveryThread.update) EID: " + eid + ", round: " + state.getMessageBatch(eid).round + ", value: " + batch.length);

115 116
                tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round, state.getMessageBatch(eid).leader);

117
                // obtain an array of requests from the taken consensus
118 119
                BatchReader batchReader = new BatchReader(batch,
                        manager.getStaticConf().getUseSignatures() == 1);
120

121
                Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
122

123 124 125 126 127 128 129 130
                TOMMessage[] requests = batchReader.deserialiseRequests(manager);

                tomLayer.clientsManager.getClientsLock().lock();
                for (int i = 0; i < requests.length; i++) {

                    tomLayer.clientsManager.requestOrdered(requests[i]);

                }
131 132

                //deliver the request to the application (receiver)
133 134
                tomLayer.clientsManager.getClientsLock().unlock();

135 136 137 138 139 140
                for (int i = 0; i < requests.length; i++) {

                    /******* Deixo isto comentado, pois nao me parece necessario      **********/
                    /******* Alem disso, esta informacao nao vem no TransferableState **********
                    requests[i].requestTotalLatency = System.currentTimeMillis()-cons.startTime;
                    /***************************************************************************/
141 142 143 144 145 146 147 148 149 150 151
                    
                    //receiver.receiveOrderedMessage(requests[i]);
                    
                    //******* EDUARDO BEGIN: Acho que precisa mudar aqui, como na entrega normal **************//
                     //TODO: verificar se aqui precisa mudar a enterga para as vistas
                     if (requests[i].getViewID() == this.manager.getCurrentViewId()) {
                        if (requests[i].getReqType() == ReconfigurationManager.TOM_NORMAL_REQUEST) {
                            receiver.receiveOrderedMessage(requests[i]);
                        } else {
                            //Reconfiguration request processing!
                            this.manager.enqueueUpdate(requests[i]);
152

153 154 155
                        }
                    } else {
                        this.tomLayer.getCommunication().send(new int[]{requests[i].getSender()},
156 157
                                new TOMMessage(this.manager.getStaticConf().getProcessId(),
                                requests[i].getSession(), requests[i].getSequence(),
158
                                TOMUtil.getBytes(this.manager.getCurrentView()), this.manager.getCurrentViewId()));
159
                    }
160 161 162 163 164 165 166 167 168 169 170
                    
                    //******* EDUARDO END: Acho que precisa mudar aqui, como na entrega normal **************//
                }

                // isto serve para retirar pedidos que nao cheguem a ser processados pela replica,
                // uma vez que se saltaram varias execucoes de consenso para a frente
                tomLayer.clientsManager.removeRequests(requests);

                //******* EDUARDO BEGIN: Acho que precisa mudar aqui, como na entrega normal **************//
                //TODO: verificar se aqui precisa mudar a enterga para as vistas
                if (this.manager.hasUpdates()) {
171 172

                    receiver.waitForProcessingRequests();
173 174 175 176 177 178 179 180 181 182
                    //System.out.println("Entrou aqui 1");
                    
                    //byte[] response = this.manager.executeUpdates(eid,state.getMessageBatch(eid).round,this.receiver.getState());
                    byte[] blabla = {1,2,3};
                    byte[] response = this.manager.executeUpdates(eid,state.getMessageBatch(eid).round,blabla);
                    TOMMessage[] dests = this.manager.clearUpdates();

                    for (int i = 0; i < dests.length; i++) {
                        //System.out.println("Entrou aqui 2");
                        this.tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
183 184
                                new TOMMessage(this.manager.getStaticConf().getProcessId(), 
                                dests[i].getSession(), dests[i].getSequence(),
185 186
                                response, this.manager.getCurrentViewId()));

187
                    }
188 189 190
                    
                    this.tomLayer.getCommunication().updateServersConnections();
                    
191
                }
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
                //******* EDUARDO END: Acho que precisa mudar aqui, como na entrega normal **************//


            /****** Julgo que isto nao sera necessario ***********
            if (conf.getCheckpoint_period() > 0) {
            if ((eid > 0) && (eid % conf.getCheckpoint_period() == 0)) {
            Logger.println("(DeliveryThread.update) Performing checkpoint for consensus " + eid);
            byte[] state2 = receiver.getState();
            tomLayer.saveState(state2, eid);
            //TODO: possivelmente fazer mais alguma coisa
            }
            else {
            Logger.println("(DeliveryThread.update) Storing message batch in the state log for consensus " + eid);
            tomLayer.saveBatch(batch, eid);
            //TODO: possivelmente fazer mais alguma coisa
            }
            }
             */
210
            } catch (Exception e) {
211
                e.printStackTrace(System.out);
212 213 214 215
            }

        }

216 217 218 219 220 221
        //set this consensus as the last executed
        tomLayer.setLastExec(lastEid);

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
        if (lastEid > 2) {
222
            int stableConsensus = lastEid - 3;
223 224

            //tomLayer.lm.removeStableMultipleConsenusInfos(lastCheckpointEid, stableConsensus);
225
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
226 227 228
        }

        //define that end of this execution
229
        //stateManager.setWaiting(-1);
230
        tomLayer.setNoExec();
231

232
        decided.clear();
233 234

        Logger.println("(DeliveryThread.update) All finished from " + lastCheckpointEid + " to " + lastEid);
235 236 237 238 239 240 241 242 243 244 245 246
    //verify if there is a next proposal to be executed
    //(it only happens if the previous consensus were decided in a
    //round > 0
    /** Nao consigo perceber se isto tem utilidade neste contexto *****/
    //int nextExecution = lastEid + 1;
    //if(tomLayer.acceptor.executeAcceptedPendent(nextExecution)) {
    //Logger.println("(DeliveryThread.update) Executed propose for " + nextExecution);
    //}
    /******************************************************************/

    //canDeliver.signalAll();
    //deliverLock.unlock();
247 248
    }

249
    /********************************************************/
P
pjsousa@gmail.com 已提交
250 251 252 253 254
    /**
     * This is the code for the thread. It delivers decided consensus to the TOM request receiver object (which is the application)
     */
    @Override
    public void run() {
255

P
pjsousa@gmail.com 已提交
256 257
        long startTime;
        while (true) {
258 259

            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
260
            deliverLock();
261 262

            //if (tomLayer != null) {
263 264 265
            while (tomLayer.isRetrievingState()) {
                canDeliver.awaitUninterruptibly();
            }
266 267
            //}
            /******************************************************************/
P
pjsousa@gmail.com 已提交
268
            try {
269

270 271
                //Consensus cons = decided.take(); // take a decided consensus
                /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
B
bessani@gmail.com 已提交
272
                //Logger.println("(DeliveryThread.run) Waiting for a consensus to be delivered.");
273
                Consensus cons = decided.poll(1500, TimeUnit.MILLISECONDS); // take a decided consensus
274
                
275
                if (cons == null) {
B
bessani@gmail.com 已提交
276
                    //Logger.println("(DeliveryThread.run) Timeout while waiting for a consensus, starting over.");
277 278 279 280 281
                    deliverUnlock();
                    continue;
                }
                Logger.println("(DeliveryThread.run) A consensus was delivered.");
                /******************************************************************/
P
pjsousa@gmail.com 已提交
282
                startTime = System.currentTimeMillis();
283 284 285
                
                
                //System.out.println("vai entragar o consenso: "+cons.getId());
P
pjsousa@gmail.com 已提交
286 287 288 289

                //TODO: avoid the case in which the received valid proposal is
                //different from the decided value

290 291 292 293 294 295
                

                //System.out.println("chegou aqui 1: "+cons.getId());

                //System.out.println("(TESTE // DeliveryThread.run) EID: " + cons.getId() + ", round: " + cons.getDecisionRound() + ", value: " + cons.getDecision().length);

P
pjsousa@gmail.com 已提交
296 297
                TOMMessage[] requests = (TOMMessage[]) cons.getDeserializedDecision();

298 299
                 //System.out.println("chegou aqui 2: "+cons.getId());
                
P
pjsousa@gmail.com 已提交
300
                if (requests == null) {
301 302 303
                    
                     //System.out.println("chegou aqui 3 a: "+cons.getId());
                    
P
pjsousa@gmail.com 已提交
304 305
                    Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");

306 307 308 309
                    // obtain an array of requests from the taken consensus
                    BatchReader batchReader = new BatchReader(cons.getDecision(),
                            manager.getStaticConf().getUseSignatures() == 1);
                    requests = batchReader.deserialiseRequests(manager);
P
pjsousa@gmail.com 已提交
310

311
                } else {
312 313
                    //System.out.println("chegou aqui 3 b: "+cons.getId());
                    if (Logger.debug) {
314
                        Logger.println("(DeliveryThread.run) using cached requests from the propose.");
315
                    }
P
pjsousa@gmail.com 已提交
316

317 318 319 320 321 322 323 324
                }
                
               
                 //System.out.println("chegou aqui 4: "+cons.getId());
                
                tomLayer.clientsManager.getClientsLock().lock();
                 //System.out.println("chegou aqui 5: "+cons.getId());
                for (int i = 0; i < requests.length; i++) {
325

326
                    /** ISTO E CODIGO DO JOAO, PARA TRATAR DE DEBUGGING */
327 328
//                    if (Logger.debug)
//                        requests[i].setSequence(new DebugInfo(cons.getId(), cons.getDecisionRound(), lm.getLeader(cons.getId(), cons.getDecisionRound())));
329 330 331 332 333 334 335 336 337 338
                    /****************************************************/
                    requests[i].consensusStartTime = cons.startTime;
                    requests[i].consensusExecutionTime = cons.executionTime;
                    requests[i].consensusBatchSize = cons.batchSize;
                    //System.out.println("chegou aqui 6: "+cons.getId());
                    tomLayer.clientsManager.requestOrdered(requests[i]);
                   //System.out.println("chegou aqui 7: "+cons.getId()); 
                }
                tomLayer.clientsManager.getClientsLock().unlock();
                //System.out.println("chegou aqui 8: "+cons.getId());
P
pjsousa@gmail.com 已提交
339 340 341 342 343 344

                //set this consensus as the last executed
                tomLayer.setLastExec(cons.getId());

                //define the last stable consensus... the stable consensus can
                //be removed from the leaderManager and the executionManager
345
                /**/
P
pjsousa@gmail.com 已提交
346 347 348 349 350 351
                if (cons.getId() > 2) {
                    int stableConsensus = cons.getId() - 3;

                    tomLayer.lm.removeStableConsenusInfos(stableConsensus);
                    tomLayer.execManager.removeExecution(stableConsensus);
                }
352
                /**/
353
               
P
pjsousa@gmail.com 已提交
354 355 356 357 358

                //verify if there is a next proposal to be executed
                //(it only happens if the previous consensus were decided in a
                //round > 0
                int nextExecution = cons.getId() + 1;
359
                if (tomLayer.acceptor.executeAcceptedPendent(nextExecution)) {
P
pjsousa@gmail.com 已提交
360 361 362 363 364
                    Logger.println("(DeliveryThread.run) Executed propose for " + nextExecution);
                }

                //deliver the request to the application (receiver)
                for (int i = 0; i < requests.length; i++) {
365 366 367 368 369 370 371 372 373 374 375 376 377
                    requests[i].requestTotalLatency = System.currentTimeMillis() - cons.startTime;

                    //******* EDUARDO BEGIN **************//
                    if (requests[i].getViewID() == this.manager.getCurrentViewId()) {
                        if (requests[i].getReqType() == ReconfigurationManager.TOM_NORMAL_REQUEST) {
                            receiver.receiveOrderedMessage(requests[i]);
                        } else {
                            //Reconfiguration request processing!
                            this.manager.enqueueUpdate(requests[i]);

                        }
                    } else {
                        this.tomLayer.getCommunication().send(new int[]{requests[i].getSender()},
378 379
                                new TOMMessage(this.manager.getStaticConf().getProcessId(),
                                requests[i].getSession(), requests[i].getSequence(),
380 381 382 383
                                TOMUtil.getBytes(this.manager.getCurrentView()), this.manager.getCurrentViewId()));
                    }
                    
                        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
384 385
                }

386

387 388 389
                //******* EDUARDO BEGIN **************//
                if (this.manager.hasUpdates()) {
                    //System.out.println("Entrou aqui 1");
390
                    receiver.waitForProcessingRequests();
391 392 393 394 395 396 397 398 399 400
                    
                    
                    //byte[] response = this.manager.executeUpdates(cons.getId(),cons.getDecisionRound().getNumber(),this.receiver.getState());
                    byte[] blabla = {1,2,3};
                    byte[] response = this.manager.executeUpdates(cons.getId(), cons.getDecisionRound().getNumber(),blabla);
                    TOMMessage[] dests = this.manager.clearUpdates();

                    for (int i = 0; i < dests.length; i++) {
                        //System.out.println("Entrou aqui 2");
                        this.tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
401 402
                                new TOMMessage(this.manager.getStaticConf().getProcessId(), dests[i].getSession(),
                                dests[i].getSequence(), response, this.manager.getCurrentViewId()));
403 404 405 406 407 408 409 410 411 412

                    }
                    
                    this.tomLayer.getCommunication().updateServersConnections();
                    
                }
                //******* EDUARDO END **************//
                
                 //define that end of this execution
                tomLayer.setInExec(-1);
413
                tomLayer.processOutOfContext();
414 415

                /** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
416 417
                Logger.println("(DeliveryThread.run) I just delivered the batch of EID " + cons.getId());

418 419
                //Thread.sleep(5);

420
                if (manager.getStaticConf().isStateTransferEnabled()) {
421
                    Logger.println("(DeliveryThread.run) The state transfer protocol is enabled");
422 423
                    if (manager.getStaticConf().getCheckpoint_period() > 0) {
                        if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpoint_period()) == 0)) {
424 425
                            Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
                            byte[] state = receiver.getState();
426 427 428
                            tomLayer.saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
                        //TODO: possivelmente fazer mais alguma coisa
                        } else {
429
                            Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
430 431
                            tomLayer.saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
                        //TODO: possivelmente fazer mais alguma coisa
432
                        }
P
pjsousa@gmail.com 已提交
433 434 435 436 437
                    }
                }
                /********************************************************/
                Logger.println("(DeliveryThread.run) All finished for " + cons.getId() + ", took " + (System.currentTimeMillis() - startTime));
            } catch (Exception e) {
438
                e.printStackTrace(System.out);
P
pjsousa@gmail.com 已提交
439
            }
440
            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
441
            deliverUnlock();
442
        /******************************************************************/
P
pjsousa@gmail.com 已提交
443 444 445
        }
    }
}