DeliveryThread.java 13.0 KB
Newer Older
P
pjsousa@gmail.com 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/**
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
 * 
 * This file is part of SMaRt.
 * 
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */
package navigators.smart.tom.core;

B
bessani@gmail.com 已提交
20
import navigators.smart.tom.MessageContext;
P
pjsousa@gmail.com 已提交
21 22
import java.util.concurrent.LinkedBlockingQueue;

23
import java.util.concurrent.TimeUnit;
24 25
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
P
pjsousa@gmail.com 已提交
26
import navigators.smart.paxosatwar.Consensus;
27
import navigators.smart.reconfiguration.ServerViewManager;
28
import navigators.smart.statemanagment.TransferableState;
29
import navigators.smart.tom.TOMReceiver;
P
pjsousa@gmail.com 已提交
30
import navigators.smart.tom.core.messages.TOMMessage;
B
bessani@gmail.com 已提交
31
import navigators.smart.tom.core.messages.TOMMessageType;
P
pjsousa@gmail.com 已提交
32 33 34 35 36 37 38
import navigators.smart.tom.util.BatchReader;
import navigators.smart.tom.util.Logger;

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
B
bessani@gmail.com 已提交
39
public final class DeliveryThread extends Thread {
P
pjsousa@gmail.com 已提交
40 41 42

    private LinkedBlockingQueue<Consensus> decided = new LinkedBlockingQueue<Consensus>(); // decided consensus
    private TOMLayer tomLayer; // TOM layer
43
    private TOMReceiver receiver; // Object that receives requests from clients
44
    private ServerViewManager manager;
P
pjsousa@gmail.com 已提交
45 46 47 48 49

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
50
     * @param conf TOM configuration
P
pjsousa@gmail.com 已提交
51
     */
52
    public DeliveryThread(TOMLayer tomLayer, TOMReceiver receiver, ServerViewManager manager) {
P
pjsousa@gmail.com 已提交
53 54 55 56
        super("Delivery Thread");

        this.tomLayer = tomLayer;
        this.receiver = receiver;
57 58 59
        //******* EDUARDO BEGIN **************//
        this.manager = manager;
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
60 61 62 63 64 65 66
    }

    /**
     * Invoked by the TOM layer, to deliver a decide consensus
     * @param cons Consensus established as being decided
     */
    public void delivery(Consensus cons) {
67
        if (!containsGoodReconfig(cons)) {
B
bessani@gmail.com 已提交
68 69 70 71 72
            //set this consensus as the last executed
            tomLayer.setLastExec(cons.getId());
            //define that end of this execution
            tomLayer.setInExec(-1);
        }
P
pjsousa@gmail.com 已提交
73 74
        try {
            decided.put(cons);
B
bessani@gmail.com 已提交
75
            Logger.println("(DeliveryThread.delivery) Consensus " + cons.getId() + " finished. Decided size=" + decided.size());
P
pjsousa@gmail.com 已提交
76 77 78 79
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
B
bessani@gmail.com 已提交
80

81
    private boolean containsGoodReconfig(Consensus cons) {
B
bessani@gmail.com 已提交
82 83 84
        TOMMessage[] decidedMessages = cons.getDeserializedDecision();

        for (TOMMessage decidedMessage : decidedMessages) {
85 86
            if (decidedMessage.getReqType() == TOMMessageType.RECONFIG
                    && decidedMessage.getViewID() == manager.getCurrentViewId()) {
B
bessani@gmail.com 已提交
87 88 89 90 91 92 93
                return true;
            }
        }

        return false;
    }

94
    /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
95 96 97 98 99
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
        deliverLock.lock();
B
bessani@gmail.com 已提交
100
        //Logger.println("(DeliveryThread.deliverLock) Deliver lock obtained");
101
    }
102

103 104
    public void deliverUnlock() {
        deliverLock.unlock();
B
bessani@gmail.com 已提交
105
        //Logger.println("(DeliveryThread.deliverUnlock) Deliver Released");
106 107 108 109 110
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
111

112 113
    public void update(TransferableState state) {

114
        int lastCheckpointEid = state.getLastCheckpointEid();
115 116
        //int lastEid = state.getLastEid();
        int lastEid = lastCheckpointEid + (state.getMessageBatches() != null ? state.getMessageBatches().length : 0);
117

B
bessani@gmail.com 已提交
118 119
        Logger.println("(DeliveryThread.update) I'm going to update myself from EID "
                + lastCheckpointEid + " to EID " + lastEid);
120

121
        receiver.setState(state.getState());
122

B
bessani@gmail.com 已提交
123 124
        tomLayer.lm.addLeaderInfo(lastCheckpointEid, state.getLastCheckpointRound(),
                state.getLastCheckpointLeader());
125

126
        for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
127
            try {
128
                byte[] batch = state.getMessageBatch(eid).batch; // take a batch
129

B
bessani@gmail.com 已提交
130 131
                tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round,
                        state.getMessageBatch(eid).leader);
132

133
                Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
134

B
bessani@gmail.com 已提交
135 136
                TOMMessage[] requests = new BatchReader(batch,
                        manager.getStaticConf().getUseSignatures() == 1).deserialiseRequests(manager);
137

B
bessani@gmail.com 已提交
138
                tomLayer.clientsManager.requestsOrdered(requests);
139

140
                deliverMessages(eid, tomLayer.getLCManager().getLastReg(), requests);
141

B
bessani@gmail.com 已提交
142 143 144
                //******* EDUARDO BEGIN **************//
                if (manager.hasUpdates()) {
                    processReconfigMessages(lastCheckpointEid, state.getLastCheckpointRound());
145
                }
B
bessani@gmail.com 已提交
146
                //******* EDUARDO END **************//
147
            } catch (Exception e) {
B
bessani@gmail.com 已提交
148
                e.printStackTrace(System.err);
149 150 151 152 153 154 155 156 157
                if (e instanceof ArrayIndexOutOfBoundsException) {

                    
                            
                    System.out.println("Eid do ultimo checkpoint: " + state.getLastCheckpointEid());
                    System.out.println("Eid do ultimo consenso: " + state.getLastEid());
                    System.out.println("numero de mensagens supostamente no batch: " + (state.getLastEid() - state.getLastCheckpointEid() + 1));
                    System.out.println("numero de mensagens realmente no batch: " + state.getMessageBatches().length);
                }
158 159 160 161
            }

        }

162 163 164 165 166 167
        //set this consensus as the last executed
        tomLayer.setLastExec(lastEid);

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
        if (lastEid > 2) {
168
            int stableConsensus = lastEid - 3;
169 170

            //tomLayer.lm.removeStableMultipleConsenusInfos(lastCheckpointEid, stableConsensus);
171
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
172 173 174
        }

        //define that end of this execution
175
        //stateManager.setWaiting(-1);
176
        tomLayer.setNoExec();
177

178
        decided.clear();
179 180

        Logger.println("(DeliveryThread.update) All finished from " + lastCheckpointEid + " to " + lastEid);
181 182
    }

P
pjsousa@gmail.com 已提交
183
    /**
B
bessani@gmail.com 已提交
184 185
     * This is the code for the thread. It delivers decided consensus to the TOM
     * request receiver object (which is the application)
P
pjsousa@gmail.com 已提交
186 187 188 189
     */
    @Override
    public void run() {
        while (true) {
190
            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
191
            deliverLock();
192
            while (tomLayer.isRetrievingState()) {
193
                Logger.println("(DeliveryThread.run) Retrieving State.");
194 195
                canDeliver.awaitUninterruptibly();
            }
196
            /******************************************************************/
B
bessani@gmail.com 已提交
197 198 199
            try { //no exception should stop the batch delivery thread
                // take a decided consensus
                Consensus cons = decided.poll(1500, TimeUnit.MILLISECONDS);
200 201
                if (cons == null) {
                    deliverUnlock();
B
bessani@gmail.com 已提交
202
                    continue; //go back to the start of the loop
203
                }
B
bessani@gmail.com 已提交
204
                Logger.println("(DeliveryThread.run) Consensus " + cons.getId() + " was delivered.");
205

B
bessani@gmail.com 已提交
206
                TOMMessage[] requests = extractMessagesFromDecision(cons);
P
pjsousa@gmail.com 已提交
207

B
bessani@gmail.com 已提交
208 209 210
                //cons.firstMessageProposed contains the performance counters
                if (requests[0].equals(cons.firstMessageProposed)) {
                    requests[0] = cons.firstMessageProposed;
211
                }
212

B
bessani@gmail.com 已提交
213 214
                //clean the ordered messages from the pending buffer
                tomLayer.clientsManager.requestsOrdered(requests);
B
bessani@gmail.com 已提交
215

216
                deliverMessages(cons.getId(), tomLayer.getLCManager().getLastReg(), requests);
P
pjsousa@gmail.com 已提交
217

B
bessani@gmail.com 已提交
218 219 220 221 222 223 224 225 226
                //******* EDUARDO BEGIN **************//
                if (manager.hasUpdates()) {
                    processReconfigMessages(cons.getId(), cons.getDecisionRound().getNumber());
                    //set this consensus as the last executed
                    tomLayer.setLastExec(cons.getId());
                    //define that end of this execution
                    tomLayer.setInExec(-1);
                }
                //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
227

B
bessani@gmail.com 已提交
228 229 230
                /** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
                logDecision(cons);
                /********************************************************/
P
pjsousa@gmail.com 已提交
231 232
                //define the last stable consensus... the stable consensus can
                //be removed from the leaderManager and the executionManager
B
bessani@gmail.com 已提交
233
                //TODO: Is this part necessary? If it is, can we put it inside setLastExec
P
pjsousa@gmail.com 已提交
234 235 236 237 238 239 240
                if (cons.getId() > 2) {
                    int stableConsensus = cons.getId() - 3;

                    tomLayer.lm.removeStableConsenusInfos(stableConsensus);
                    tomLayer.execManager.removeExecution(stableConsensus);
                }

B
bessani@gmail.com 已提交
241 242 243
            } catch (Exception e) {
                e.printStackTrace(System.err);
            }
P
pjsousa@gmail.com 已提交
244

B
bessani@gmail.com 已提交
245 246 247 248 249
            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
            deliverUnlock();
            /******************************************************************/
        }
    }
250

B
bessani@gmail.com 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
    private TOMMessage[] extractMessagesFromDecision(Consensus cons) {
        TOMMessage[] requests = (TOMMessage[]) cons.getDeserializedDecision();

        if (requests == null) {
            //there are no cached deserialized requests
            //this may happen if this batch proposal was not verified
            //TODO: this condition is possible?

            Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");

            // obtain an array of requests from the taken consensus
            BatchReader batchReader = new BatchReader(cons.getDecision(),
                    manager.getStaticConf().getUseSignatures() == 1);
            requests = batchReader.deserialiseRequests(manager);
        } else {
            Logger.println("(DeliveryThread.run) using cached requests from the propose.");
        }

        return requests;
    }

272
    public void deliverUnordered(TOMMessage request, int regency) {
B
bessani@gmail.com 已提交
273
        MessageContext msgCtx = new MessageContext(System.currentTimeMillis(),
274
                new byte[0], regency, -1, request.getSender(), null);
275
        receiver.receiveReadonlyMessage(request, msgCtx);
B
bessani@gmail.com 已提交
276 277
    }

278
    private void deliverMessages(int consId, int regency, TOMMessage[] requests) {
279
    	receiver.receiveMessages(consId, regency, requests);
B
bessani@gmail.com 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
    }

    private void processReconfigMessages(int consId, int decisionRoundNumber) {
        byte[] response = manager.executeUpdates(consId, decisionRoundNumber);
        TOMMessage[] dests = manager.clearUpdates();

        for (int i = 0; i < dests.length; i++) {
            tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
                    new TOMMessage(manager.getStaticConf().getProcessId(),
                    dests[i].getSession(), dests[i].getSequence(), response,
                    manager.getCurrentViewId()));
        }

        tomLayer.getCommunication().updateServersConnections();
    }

    private void logDecision(Consensus cons) {
        if (manager.getStaticConf().getCheckpointPeriod() > 0) {
            if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpointPeriod()) == 0)) {
                Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
                byte[] state = receiver.getState();
301
                tomLayer.getStateManager().saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
B
bessani@gmail.com 已提交
302 303 304
                //TODO: possivelmente fazer mais alguma coisa
            } else {
                Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
305
                tomLayer.getStateManager().saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
B
bessani@gmail.com 已提交
306
                //TODO: possivelmente fazer mais alguma coisa
P
pjsousa@gmail.com 已提交
307 308 309 310
            }
        }
    }
}