DeliveryThread.java 19.2 KB
Newer Older
P
pjsousa@gmail.com 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/**
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
 * 
 * This file is part of SMaRt.
 * 
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */
package navigators.smart.tom.core;

import java.util.concurrent.LinkedBlockingQueue;

22
import java.util.concurrent.TimeUnit;
23 24
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
P
pjsousa@gmail.com 已提交
25
import navigators.smart.paxosatwar.Consensus;
26
import navigators.smart.reconfiguration.ReconfigurationManager;
27
import navigators.smart.statemanagment.TransferableState;
P
pjsousa@gmail.com 已提交
28 29 30
import navigators.smart.tom.TOMRequestReceiver;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.BatchReader;
B
bessani@gmail.com 已提交
31
import navigators.smart.tom.util.DebugInfo;
P
pjsousa@gmail.com 已提交
32 33 34 35 36 37 38 39 40 41 42 43
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
public class DeliveryThread extends Thread {

    private LinkedBlockingQueue<Consensus> decided = new LinkedBlockingQueue<Consensus>(); // decided consensus
    private TOMLayer tomLayer; // TOM layer
    private TOMRequestReceiver receiver; // Object that receives requests from clients
44
    private ReconfigurationManager manager;
P
pjsousa@gmail.com 已提交
45 46 47 48 49 50 51

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
     * @param conf TOM configuration
     */
52
    public DeliveryThread(TOMLayer tomLayer, TOMRequestReceiver receiver, ReconfigurationManager manager) {
P
pjsousa@gmail.com 已提交
53 54 55 56
        super("Delivery Thread");

        this.tomLayer = tomLayer;
        this.receiver = receiver;
57 58 59
        //******* EDUARDO BEGIN **************//
        this.manager = manager;
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
60 61 62 63 64 65 66 67
    }

    /**
     * Invoked by the TOM layer, to deliver a decide consensus
     * @param cons Consensus established as being decided
     */
    public void delivery(Consensus cons) {
        try {
68
            //System.out.println("Consenso decidido! "+cons.getId());
P
pjsousa@gmail.com 已提交
69 70 71 72 73 74
            decided.put(cons);
            Logger.println("(DeliveryThread.delivery) Consensus " + cons.getId() + " finished. decided size=" + decided.size());
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
75
    /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
76 77 78 79 80
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
        deliverLock.lock();
B
bessani@gmail.com 已提交
81
        //Logger.println("(DeliveryThread.deliverLock) Deliver lock obtained");
82
    }
83

84 85
    public void deliverUnlock() {
        deliverLock.unlock();
B
bessani@gmail.com 已提交
86
        //Logger.println("(DeliveryThread.deliverUnlock) Deliver Released");
87 88 89 90 91
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
92

93 94
    public void update(TransferableState state) {

95
        //deliverLock.lock();
96

97 98
        int lastCheckpointEid = state.getLastCheckpointEid();
        int lastEid = state.getLastEid();
99

100
        Logger.println("(DeliveryThread.update) I'm going to update myself from EID " + lastCheckpointEid + " to EID " + lastEid);
101

102
        receiver.setState(state.getState());
103

104
        tomLayer.lm.addLeaderInfo(lastCheckpointEid, state.getLastCheckpointRound(), state.getLastCheckpointLeader());
105

106
        for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
107 108 109

            try {

110
                byte[] batch = state.getMessageBatch(eid).batch; // take a batch
111

112 113
                //System.out.println("(TESTE // DeliveryThread.update) EID: " + eid + ", round: " + state.getMessageBatch(eid).round + ", value: " + batch.length);

114 115
                tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round, state.getMessageBatch(eid).leader);

116
                // obtain an array of requests from the taken consensus
117 118
                BatchReader batchReader = new BatchReader(batch,
                        manager.getStaticConf().getUseSignatures() == 1);
119

120
                Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
121

122 123 124 125 126 127 128 129
                TOMMessage[] requests = batchReader.deserialiseRequests(manager);

                tomLayer.clientsManager.getClientsLock().lock();
                for (int i = 0; i < requests.length; i++) {

                    tomLayer.clientsManager.requestOrdered(requests[i]);

                }
130 131

                //deliver the request to the application (receiver)
132 133
                tomLayer.clientsManager.getClientsLock().unlock();

134 135 136 137
                for (int i = 0; i < requests.length; i++) {

                    /******* Deixo isto comentado, pois nao me parece necessario      **********/
                    /******* Alem disso, esta informacao nao vem no TransferableState **********
B
bessani@gmail.com 已提交
138
                    requests[i].requestTotalLatency = System.nanoTime()-cons.startTime;
139
                    /***************************************************************************/
140 141 142 143 144 145 146 147 148 149 150
                    
                    //receiver.receiveOrderedMessage(requests[i]);
                    
                    //******* EDUARDO BEGIN: Acho que precisa mudar aqui, como na entrega normal **************//
                     //TODO: verificar se aqui precisa mudar a enterga para as vistas
                     if (requests[i].getViewID() == this.manager.getCurrentViewId()) {
                        if (requests[i].getReqType() == ReconfigurationManager.TOM_NORMAL_REQUEST) {
                            receiver.receiveOrderedMessage(requests[i]);
                        } else {
                            //Reconfiguration request processing!
                            this.manager.enqueueUpdate(requests[i]);
151

152 153 154
                        }
                    } else {
                        this.tomLayer.getCommunication().send(new int[]{requests[i].getSender()},
155 156
                                new TOMMessage(this.manager.getStaticConf().getProcessId(),
                                requests[i].getSession(), requests[i].getSequence(),
157
                                TOMUtil.getBytes(this.manager.getCurrentView()), this.manager.getCurrentViewId()));
158
                    }
159 160 161 162 163 164 165 166 167 168 169
                    
                    //******* EDUARDO END: Acho que precisa mudar aqui, como na entrega normal **************//
                }

                // isto serve para retirar pedidos que nao cheguem a ser processados pela replica,
                // uma vez que se saltaram varias execucoes de consenso para a frente
                tomLayer.clientsManager.removeRequests(requests);

                //******* EDUARDO BEGIN: Acho que precisa mudar aqui, como na entrega normal **************//
                //TODO: verificar se aqui precisa mudar a enterga para as vistas
                if (this.manager.hasUpdates()) {
170 171

                    receiver.waitForProcessingRequests();
172 173 174 175 176 177 178 179 180 181
                    //System.out.println("Entrou aqui 1");
                    
                    //byte[] response = this.manager.executeUpdates(eid,state.getMessageBatch(eid).round,this.receiver.getState());
                    byte[] blabla = {1,2,3};
                    byte[] response = this.manager.executeUpdates(eid,state.getMessageBatch(eid).round,blabla);
                    TOMMessage[] dests = this.manager.clearUpdates();

                    for (int i = 0; i < dests.length; i++) {
                        //System.out.println("Entrou aqui 2");
                        this.tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
182 183
                                new TOMMessage(this.manager.getStaticConf().getProcessId(), 
                                dests[i].getSession(), dests[i].getSequence(),
184 185
                                response, this.manager.getCurrentViewId()));

186
                    }
187 188 189
                    
                    this.tomLayer.getCommunication().updateServersConnections();
                    
190
                }
191
                //******* EDUARDO END: Acho que precisa mudar aqui, como na entrega normal **************//
192
            } catch (Exception e) {
193
                e.printStackTrace(System.out);
194 195 196 197
            }

        }

198 199 200 201 202 203
        //set this consensus as the last executed
        tomLayer.setLastExec(lastEid);

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
        if (lastEid > 2) {
204
            int stableConsensus = lastEid - 3;
205 206

            //tomLayer.lm.removeStableMultipleConsenusInfos(lastCheckpointEid, stableConsensus);
207
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
208 209 210
        }

        //define that end of this execution
211
        //stateManager.setWaiting(-1);
212
        tomLayer.setNoExec();
213

214
        decided.clear();
215 216

        Logger.println("(DeliveryThread.update) All finished from " + lastCheckpointEid + " to " + lastEid);
217 218 219 220 221 222 223 224 225 226 227 228
    //verify if there is a next proposal to be executed
    //(it only happens if the previous consensus were decided in a
    //round > 0
    /** Nao consigo perceber se isto tem utilidade neste contexto *****/
    //int nextExecution = lastEid + 1;
    //if(tomLayer.acceptor.executeAcceptedPendent(nextExecution)) {
    //Logger.println("(DeliveryThread.update) Executed propose for " + nextExecution);
    //}
    /******************************************************************/

    //canDeliver.signalAll();
    //deliverLock.unlock();
229 230
    }

231
    /********************************************************/
P
pjsousa@gmail.com 已提交
232 233 234 235 236
    /**
     * This is the code for the thread. It delivers decided consensus to the TOM request receiver object (which is the application)
     */
    @Override
    public void run() {
237

P
pjsousa@gmail.com 已提交
238 239
        long startTime;
        while (true) {
240 241

            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
242
            deliverLock();
243 244

            //if (tomLayer != null) {
245 246 247
            while (tomLayer.isRetrievingState()) {
                canDeliver.awaitUninterruptibly();
            }
248 249
            //}
            /******************************************************************/
P
pjsousa@gmail.com 已提交
250
            try {
251

252 253
                //Consensus cons = decided.take(); // take a decided consensus
                /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
B
bessani@gmail.com 已提交
254
                //Logger.println("(DeliveryThread.run) Waiting for a consensus to be delivered.");
255
                Consensus cons = decided.poll(1500, TimeUnit.MILLISECONDS); // take a decided consensus
256
                
257
                if (cons == null) {
B
bessani@gmail.com 已提交
258
                    //Logger.println("(DeliveryThread.run) Timeout while waiting for a consensus, starting over.");
259 260 261 262 263
                    deliverUnlock();
                    continue;
                }
                Logger.println("(DeliveryThread.run) A consensus was delivered.");
                /******************************************************************/
B
bessani@gmail.com 已提交
264
                startTime = System.nanoTime();
265 266 267
                
                
                //System.out.println("vai entragar o consenso: "+cons.getId());
P
pjsousa@gmail.com 已提交
268

269 270
                //System.out.println("(TESTE // DeliveryThread.run) EID: " + cons.getId() + ", round: " + cons.getDecisionRound() + ", value: " + cons.getDecision().length);

P
pjsousa@gmail.com 已提交
271 272
                TOMMessage[] requests = (TOMMessage[]) cons.getDeserializedDecision();

273 274
                 //System.out.println("chegou aqui 2: "+cons.getId());
                
P
pjsousa@gmail.com 已提交
275
                if (requests == null) {
276 277 278
                    
                     //System.out.println("chegou aqui 3 a: "+cons.getId());
                    
P
pjsousa@gmail.com 已提交
279 280
                    Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");

281 282 283 284
                    // obtain an array of requests from the taken consensus
                    BatchReader batchReader = new BatchReader(cons.getDecision(),
                            manager.getStaticConf().getUseSignatures() == 1);
                    requests = batchReader.deserialiseRequests(manager);
P
pjsousa@gmail.com 已提交
285

286
                } else {
287 288
                    //System.out.println("chegou aqui 3 b: "+cons.getId());
                    if (Logger.debug) {
289
                        Logger.println("(DeliveryThread.run) using cached requests from the propose.");
290
                    }
P
pjsousa@gmail.com 已提交
291

292
                }
B
bessani@gmail.com 已提交
293
                                
294
                tomLayer.clientsManager.getClientsLock().lock();
295

B
bessani@gmail.com 已提交
296 297 298 299
                if(requests[0].equals(cons.firstMessageProposed))
                    requests[0] = cons.firstMessageProposed;

                for (int i = 0; i < requests.length; i++) {
300
                    /** ISTO E CODIGO DO JOAO, PARA TRATAR DE DEBUGGING */
B
bessani@gmail.com 已提交
301
                    requests[i].setDebugInfo(new DebugInfo(cons.getId(),0,0,requests[i]));
302
                    /****************************************************/
B
bessani@gmail.com 已提交
303
                    
304 305 306 307 308 309
                    //System.out.println("chegou aqui 6: "+cons.getId());
                    tomLayer.clientsManager.requestOrdered(requests[i]);
                   //System.out.println("chegou aqui 7: "+cons.getId()); 
                }
                tomLayer.clientsManager.getClientsLock().unlock();
                //System.out.println("chegou aqui 8: "+cons.getId());
P
pjsousa@gmail.com 已提交
310 311 312 313 314 315 316 317 318 319 320 321

                //set this consensus as the last executed
                tomLayer.setLastExec(cons.getId());

                //define the last stable consensus... the stable consensus can
                //be removed from the leaderManager and the executionManager
                if (cons.getId() > 2) {
                    int stableConsensus = cons.getId() - 3;

                    tomLayer.lm.removeStableConsenusInfos(stableConsensus);
                    tomLayer.execManager.removeExecution(stableConsensus);
                }
322
               
P
pjsousa@gmail.com 已提交
323 324 325 326 327

                //verify if there is a next proposal to be executed
                //(it only happens if the previous consensus were decided in a
                //round > 0
                int nextExecution = cons.getId() + 1;
328
                if (tomLayer.acceptor.executeAcceptedPendent(nextExecution)) {
P
pjsousa@gmail.com 已提交
329 330 331 332 333
                    Logger.println("(DeliveryThread.run) Executed propose for " + nextExecution);
                }

                //deliver the request to the application (receiver)
                for (int i = 0; i < requests.length; i++) {
B
bessani@gmail.com 已提交
334
                    requests[i].deliveryTime = System.nanoTime();
335 336 337 338

                    //******* EDUARDO BEGIN **************//
                    if (requests[i].getViewID() == this.manager.getCurrentViewId()) {
                        if (requests[i].getReqType() == ReconfigurationManager.TOM_NORMAL_REQUEST) {
B
bessani@gmail.com 已提交
339
                            //normal request execution
340 341 342 343 344 345 346
                            receiver.receiveOrderedMessage(requests[i]);
                        } else {
                            //Reconfiguration request processing!
                            this.manager.enqueueUpdate(requests[i]);
                        }
                    } else {
                        this.tomLayer.getCommunication().send(new int[]{requests[i].getSender()},
347 348
                                new TOMMessage(this.manager.getStaticConf().getProcessId(),
                                requests[i].getSession(), requests[i].getSequence(),
349 350
                                TOMUtil.getBytes(this.manager.getCurrentView()), this.manager.getCurrentViewId()));
                    }
B
bessani@gmail.com 已提交
351
                    //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
352 353
                }

354

355 356 357
                //******* EDUARDO BEGIN **************//
                if (this.manager.hasUpdates()) {
                    //System.out.println("Entrou aqui 1");
358
                    receiver.waitForProcessingRequests();
359 360 361 362 363 364 365 366 367 368
                    
                    
                    //byte[] response = this.manager.executeUpdates(cons.getId(),cons.getDecisionRound().getNumber(),this.receiver.getState());
                    byte[] blabla = {1,2,3};
                    byte[] response = this.manager.executeUpdates(cons.getId(), cons.getDecisionRound().getNumber(),blabla);
                    TOMMessage[] dests = this.manager.clearUpdates();

                    for (int i = 0; i < dests.length; i++) {
                        //System.out.println("Entrou aqui 2");
                        this.tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
369 370
                                new TOMMessage(this.manager.getStaticConf().getProcessId(), dests[i].getSession(),
                                dests[i].getSequence(), response, this.manager.getCurrentViewId()));
371 372 373 374 375 376 377 378 379 380

                    }
                    
                    this.tomLayer.getCommunication().updateServersConnections();
                    
                }
                //******* EDUARDO END **************//
                
                 //define that end of this execution
                tomLayer.setInExec(-1);
381
                tomLayer.processOutOfContext();
382 383

                /** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
384 385
                Logger.println("(DeliveryThread.run) I just delivered the batch of EID " + cons.getId());

386 387
                //Thread.sleep(5);

388
                if (manager.getStaticConf().isStateTransferEnabled()) {
389
                    Logger.println("(DeliveryThread.run) The state transfer protocol is enabled");
390 391
                    if (manager.getStaticConf().getCheckpoint_period() > 0) {
                        if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpoint_period()) == 0)) {
392 393
                            Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
                            byte[] state = receiver.getState();
394 395 396
                            tomLayer.saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
                        //TODO: possivelmente fazer mais alguma coisa
                        } else {
397
                            Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
398 399
                            tomLayer.saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
                        //TODO: possivelmente fazer mais alguma coisa
400
                        }
P
pjsousa@gmail.com 已提交
401 402 403
                    }
                }
                /********************************************************/
B
bessani@gmail.com 已提交
404
                Logger.println("(DeliveryThread.run) All finished for " + cons.getId() + ", took " + (System.nanoTime() - startTime));
P
pjsousa@gmail.com 已提交
405
            } catch (Exception e) {
406
                e.printStackTrace(System.out);
P
pjsousa@gmail.com 已提交
407
            }
408
            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
409
            deliverUnlock();
410
        /******************************************************************/
P
pjsousa@gmail.com 已提交
411 412 413
        }
    }
}