DeliveryThread.java 14.4 KB
Newer Older
P
pjsousa@gmail.com 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/**
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
 * 
 * This file is part of SMaRt.
 * 
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */
package navigators.smart.tom.core;

B
bessani@gmail.com 已提交
20
import navigators.smart.tom.MessageContext;
P
pjsousa@gmail.com 已提交
21 22
import java.util.concurrent.LinkedBlockingQueue;

23
import java.util.concurrent.TimeUnit;
24 25
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
P
pjsousa@gmail.com 已提交
26
import navigators.smart.paxosatwar.Consensus;
27
import navigators.smart.reconfiguration.ServerViewManager;
28
import navigators.smart.statemanagment.TransferableState;
P
pjsousa@gmail.com 已提交
29 30
import navigators.smart.tom.TOMRequestReceiver;
import navigators.smart.tom.core.messages.TOMMessage;
B
bessani@gmail.com 已提交
31
import navigators.smart.tom.core.messages.TOMMessageType;
P
pjsousa@gmail.com 已提交
32 33 34 35 36 37 38 39
import navigators.smart.tom.util.BatchReader;
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
B
bessani@gmail.com 已提交
40
public final class DeliveryThread extends Thread {
P
pjsousa@gmail.com 已提交
41 42 43 44

    private LinkedBlockingQueue<Consensus> decided = new LinkedBlockingQueue<Consensus>(); // decided consensus
    private TOMLayer tomLayer; // TOM layer
    private TOMRequestReceiver receiver; // Object that receives requests from clients
45
    private ServerViewManager manager;
P
pjsousa@gmail.com 已提交
46 47 48 49 50

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
51
     * @param conf TOM configuration
P
pjsousa@gmail.com 已提交
52
     */
53
    public DeliveryThread(TOMLayer tomLayer, TOMRequestReceiver receiver, ServerViewManager manager) {
P
pjsousa@gmail.com 已提交
54 55 56 57
        super("Delivery Thread");

        this.tomLayer = tomLayer;
        this.receiver = receiver;
58 59 60
        //******* EDUARDO BEGIN **************//
        this.manager = manager;
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
61 62 63 64 65 66 67
    }

    /**
     * Invoked by the TOM layer, to deliver a decide consensus
     * @param cons Consensus established as being decided
     */
    public void delivery(Consensus cons) {
68
        if (!containsGoodReconfig(cons)) {
B
bessani@gmail.com 已提交
69 70 71 72 73
            //set this consensus as the last executed
            tomLayer.setLastExec(cons.getId());
            //define that end of this execution
            tomLayer.setInExec(-1);
        }
P
pjsousa@gmail.com 已提交
74 75
        try {
            decided.put(cons);
B
bessani@gmail.com 已提交
76
            Logger.println("(DeliveryThread.delivery) Consensus " + cons.getId() + " finished. Decided size=" + decided.size());
P
pjsousa@gmail.com 已提交
77 78 79 80
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
B
bessani@gmail.com 已提交
81

82
    private boolean containsGoodReconfig(Consensus cons) {
B
bessani@gmail.com 已提交
83 84 85
        TOMMessage[] decidedMessages = cons.getDeserializedDecision();

        for (TOMMessage decidedMessage : decidedMessages) {
86 87
            if (decidedMessage.getReqType() == TOMMessageType.RECONFIG
                    && decidedMessage.getViewID() == manager.getCurrentViewId()) {
B
bessani@gmail.com 已提交
88 89 90 91 92 93 94
                return true;
            }
        }

        return false;
    }

95
    /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
96 97 98 99 100
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
        deliverLock.lock();
B
bessani@gmail.com 已提交
101
        //Logger.println("(DeliveryThread.deliverLock) Deliver lock obtained");
102
    }
103

104 105
    public void deliverUnlock() {
        deliverLock.unlock();
B
bessani@gmail.com 已提交
106
        //Logger.println("(DeliveryThread.deliverUnlock) Deliver Released");
107 108 109 110 111
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
112

113 114
    public void update(TransferableState state) {

115
        int lastCheckpointEid = state.getLastCheckpointEid();
116 117
        //int lastEid = state.getLastEid();
        int lastEid = lastCheckpointEid + (state.getMessageBatches() != null ? state.getMessageBatches().length : 0);
118

B
bessani@gmail.com 已提交
119 120
        Logger.println("(DeliveryThread.update) I'm going to update myself from EID "
                + lastCheckpointEid + " to EID " + lastEid);
121

122
        receiver.setState(state.getState());
123

B
bessani@gmail.com 已提交
124 125
        tomLayer.lm.addLeaderInfo(lastCheckpointEid, state.getLastCheckpointRound(),
                state.getLastCheckpointLeader());
126

127
        for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
128
            try {
129
                byte[] batch = state.getMessageBatch(eid).batch; // take a batch
130

B
bessani@gmail.com 已提交
131 132
                tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round,
                        state.getMessageBatch(eid).leader);
133

134
                Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
135

B
bessani@gmail.com 已提交
136 137
                TOMMessage[] requests = new BatchReader(batch,
                        manager.getStaticConf().getUseSignatures() == 1).deserialiseRequests(manager);
138

B
bessani@gmail.com 已提交
139
                tomLayer.clientsManager.requestsOrdered(requests);
140

141
                deliverMessages(eid, tomLayer.getLCManager().getLastReg(), requests);
142

B
bessani@gmail.com 已提交
143 144 145
                //******* EDUARDO BEGIN **************//
                if (manager.hasUpdates()) {
                    processReconfigMessages(lastCheckpointEid, state.getLastCheckpointRound());
146
                }
B
bessani@gmail.com 已提交
147
                //******* EDUARDO END **************//
148
            } catch (Exception e) {
B
bessani@gmail.com 已提交
149
                e.printStackTrace(System.err);
150 151 152 153 154 155 156 157 158
                if (e instanceof ArrayIndexOutOfBoundsException) {

                    
                            
                    System.out.println("Eid do ultimo checkpoint: " + state.getLastCheckpointEid());
                    System.out.println("Eid do ultimo consenso: " + state.getLastEid());
                    System.out.println("numero de mensagens supostamente no batch: " + (state.getLastEid() - state.getLastCheckpointEid() + 1));
                    System.out.println("numero de mensagens realmente no batch: " + state.getMessageBatches().length);
                }
159 160 161 162
            }

        }

163 164 165 166 167 168
        //set this consensus as the last executed
        tomLayer.setLastExec(lastEid);

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
        if (lastEid > 2) {
169
            int stableConsensus = lastEid - 3;
170 171

            //tomLayer.lm.removeStableMultipleConsenusInfos(lastCheckpointEid, stableConsensus);
172
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
173 174 175
        }

        //define that end of this execution
176
        //stateManager.setWaiting(-1);
177
        tomLayer.setNoExec();
178

179
        decided.clear();
180 181

        Logger.println("(DeliveryThread.update) All finished from " + lastCheckpointEid + " to " + lastEid);
182 183
    }

P
pjsousa@gmail.com 已提交
184
    /**
B
bessani@gmail.com 已提交
185 186
     * This is the code for the thread. It delivers decided consensus to the TOM
     * request receiver object (which is the application)
P
pjsousa@gmail.com 已提交
187 188 189 190
     */
    @Override
    public void run() {
        while (true) {
191
            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
192
            deliverLock();
193
            while (tomLayer.isRetrievingState()) {
194
                Logger.println("(DeliveryThread.run) Retrieving State.");
195 196
                canDeliver.awaitUninterruptibly();
            }
197
            /******************************************************************/
B
bessani@gmail.com 已提交
198 199 200
            try { //no exception should stop the batch delivery thread
                // take a decided consensus
                Consensus cons = decided.poll(1500, TimeUnit.MILLISECONDS);
201 202
                if (cons == null) {
                    deliverUnlock();
B
bessani@gmail.com 已提交
203
                    continue; //go back to the start of the loop
204
                }
B
bessani@gmail.com 已提交
205
                Logger.println("(DeliveryThread.run) Consensus " + cons.getId() + " was delivered.");
206

B
bessani@gmail.com 已提交
207
                TOMMessage[] requests = extractMessagesFromDecision(cons);
P
pjsousa@gmail.com 已提交
208

B
bessani@gmail.com 已提交
209 210 211
                //cons.firstMessageProposed contains the performance counters
                if (requests[0].equals(cons.firstMessageProposed)) {
                    requests[0] = cons.firstMessageProposed;
212
                }
213

B
bessani@gmail.com 已提交
214 215
                //clean the ordered messages from the pending buffer
                tomLayer.clientsManager.requestsOrdered(requests);
B
bessani@gmail.com 已提交
216

217
                deliverMessages(cons.getId(), tomLayer.getLCManager().getLastReg(), requests);
P
pjsousa@gmail.com 已提交
218

B
bessani@gmail.com 已提交
219 220 221 222 223 224 225 226 227
                //******* EDUARDO BEGIN **************//
                if (manager.hasUpdates()) {
                    processReconfigMessages(cons.getId(), cons.getDecisionRound().getNumber());
                    //set this consensus as the last executed
                    tomLayer.setLastExec(cons.getId());
                    //define that end of this execution
                    tomLayer.setInExec(-1);
                }
                //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
228

B
bessani@gmail.com 已提交
229 230 231
                /** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
                logDecision(cons);
                /********************************************************/
P
pjsousa@gmail.com 已提交
232 233
                //define the last stable consensus... the stable consensus can
                //be removed from the leaderManager and the executionManager
B
bessani@gmail.com 已提交
234
                //TODO: Is this part necessary? If it is, can we put it inside setLastExec
P
pjsousa@gmail.com 已提交
235 236 237 238 239 240 241
                if (cons.getId() > 2) {
                    int stableConsensus = cons.getId() - 3;

                    tomLayer.lm.removeStableConsenusInfos(stableConsensus);
                    tomLayer.execManager.removeExecution(stableConsensus);
                }

B
bessani@gmail.com 已提交
242 243 244
            } catch (Exception e) {
                e.printStackTrace(System.err);
            }
P
pjsousa@gmail.com 已提交
245

B
bessani@gmail.com 已提交
246 247 248 249 250
            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
            deliverUnlock();
            /******************************************************************/
        }
    }
251

B
bessani@gmail.com 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
    private TOMMessage[] extractMessagesFromDecision(Consensus cons) {
        TOMMessage[] requests = (TOMMessage[]) cons.getDeserializedDecision();

        if (requests == null) {
            //there are no cached deserialized requests
            //this may happen if this batch proposal was not verified
            //TODO: this condition is possible?

            Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");

            // obtain an array of requests from the taken consensus
            BatchReader batchReader = new BatchReader(cons.getDecision(),
                    manager.getStaticConf().getUseSignatures() == 1);
            requests = batchReader.deserialiseRequests(manager);
        } else {
            Logger.println("(DeliveryThread.run) using cached requests from the propose.");
        }

        return requests;
    }

273
    public void deliverUnordered(TOMMessage request, int regency) {
B
bessani@gmail.com 已提交
274
        MessageContext msgCtx = new MessageContext(System.currentTimeMillis(),
275
                new byte[0], regency, -1, request.getSender(), null);
B
bessani@gmail.com 已提交
276 277 278
        receiver.receiveMessage(request, msgCtx);
    }

279
    private void deliverMessages(int consId, int regency, TOMMessage[] requests) {
B
bessani@gmail.com 已提交
280 281 282 283 284 285
        TOMMessage firstRequest = requests[0];
        
        for (TOMMessage request: requests) {
            if (request.getViewID() == manager.getCurrentViewId()) {
                if (request.getReqType() == TOMMessageType.REQUEST) {
                    //normal request execution
286
                    
B
bessani@gmail.com 已提交
287 288
                    //create a context for the batch of messages to be delivered
                    MessageContext msgCtx = new MessageContext(firstRequest.timestamp, 
289
                            firstRequest.nonces, regency, consId, request.getSender(), firstRequest);
290

B
bessani@gmail.com 已提交
291 292 293 294 295 296 297 298
                    request.deliveryTime = System.nanoTime();

                    receiver.receiveOrderedMessage(request, msgCtx);
                } else if (request.getReqType() == TOMMessageType.RECONFIG) {
                    //Reconfiguration request to be processed after the batch
                    manager.enqueueUpdate(request);
                } else {
                    throw new RuntimeException("Should never reach here!");
P
pjsousa@gmail.com 已提交
299
                }
B
bessani@gmail.com 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
            } else {
                //message sender had an old view, resend the message to him
                tomLayer.getCommunication().send(new int[]{request.getSender()},
                        new TOMMessage(manager.getStaticConf().getProcessId(),
                        request.getSession(), request.getSequence(),
                        TOMUtil.getBytes(manager.getCurrentView()), manager.getCurrentViewId()));
            }
        }
    }

    private void processReconfigMessages(int consId, int decisionRoundNumber) {
        byte[] response = manager.executeUpdates(consId, decisionRoundNumber);
        TOMMessage[] dests = manager.clearUpdates();

        for (int i = 0; i < dests.length; i++) {
            tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
                    new TOMMessage(manager.getStaticConf().getProcessId(),
                    dests[i].getSession(), dests[i].getSequence(), response,
                    manager.getCurrentViewId()));
        }

        tomLayer.getCommunication().updateServersConnections();
    }

    private void logDecision(Consensus cons) {
        if (manager.getStaticConf().getCheckpointPeriod() > 0) {
            if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpointPeriod()) == 0)) {
                Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
                byte[] state = receiver.getState();
329
                tomLayer.getStateManager().saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
B
bessani@gmail.com 已提交
330 331 332
                //TODO: possivelmente fazer mais alguma coisa
            } else {
                Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
333
                tomLayer.getStateManager().saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
B
bessani@gmail.com 已提交
334
                //TODO: possivelmente fazer mais alguma coisa
P
pjsousa@gmail.com 已提交
335 336 337 338
            }
        }
    }
}