DeliveryThread.java 13.1 KB
Newer Older
P
pjsousa@gmail.com 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/**
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
 * 
 * This file is part of SMaRt.
 * 
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */
package navigators.smart.tom.core;

B
bessani@gmail.com 已提交
20
import navigators.smart.tom.MessageContext;
P
pjsousa@gmail.com 已提交
21 22
import java.util.concurrent.LinkedBlockingQueue;

23
import java.util.concurrent.TimeUnit;
24 25
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
P
pjsousa@gmail.com 已提交
26
import navigators.smart.paxosatwar.Consensus;
27
import navigators.smart.reconfiguration.ServerViewManager;
28
import navigators.smart.statemanagment.TransferableState;
29
import navigators.smart.tom.TOMReceiver;
P
pjsousa@gmail.com 已提交
30
import navigators.smart.tom.core.messages.TOMMessage;
B
bessani@gmail.com 已提交
31
import navigators.smart.tom.core.messages.TOMMessageType;
P
pjsousa@gmail.com 已提交
32 33 34 35 36 37 38
import navigators.smart.tom.util.BatchReader;
import navigators.smart.tom.util.Logger;

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
B
bessani@gmail.com 已提交
39
public final class DeliveryThread extends Thread {
P
pjsousa@gmail.com 已提交
40 41 42

    private LinkedBlockingQueue<Consensus> decided = new LinkedBlockingQueue<Consensus>(); // decided consensus
    private TOMLayer tomLayer; // TOM layer
43
    private TOMReceiver receiver; // Object that receives requests from clients
44
    private ServerViewManager manager;
P
pjsousa@gmail.com 已提交
45 46 47 48 49

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
50
     * @param conf TOM configuration
P
pjsousa@gmail.com 已提交
51
     */
52
    public DeliveryThread(TOMLayer tomLayer, TOMReceiver receiver, ServerViewManager manager) {
P
pjsousa@gmail.com 已提交
53 54 55 56
        super("Delivery Thread");

        this.tomLayer = tomLayer;
        this.receiver = receiver;
57 58 59
        //******* EDUARDO BEGIN **************//
        this.manager = manager;
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
60 61 62 63 64 65 66
    }

    /**
     * Invoked by the TOM layer, to deliver a decide consensus
     * @param cons Consensus established as being decided
     */
    public void delivery(Consensus cons) {
67
        if (!containsGoodReconfig(cons)) {
68 69

            Logger.println("(DeliveryThread.delivery) Consensus ID " + cons.getId() + " does not contain good reconfiguration");
B
bessani@gmail.com 已提交
70 71 72 73 74
            //set this consensus as the last executed
            tomLayer.setLastExec(cons.getId());
            //define that end of this execution
            tomLayer.setInExec(-1);
        }
P
pjsousa@gmail.com 已提交
75 76
        try {
            decided.put(cons);
B
bessani@gmail.com 已提交
77
            Logger.println("(DeliveryThread.delivery) Consensus " + cons.getId() + " finished. Decided size=" + decided.size());
P
pjsousa@gmail.com 已提交
78 79 80 81
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
B
bessani@gmail.com 已提交
82

83
    private boolean containsGoodReconfig(Consensus cons) {
B
bessani@gmail.com 已提交
84 85 86
        TOMMessage[] decidedMessages = cons.getDeserializedDecision();

        for (TOMMessage decidedMessage : decidedMessages) {
87 88
            if (decidedMessage.getReqType() == TOMMessageType.RECONFIG
                    && decidedMessage.getViewID() == manager.getCurrentViewId()) {
B
bessani@gmail.com 已提交
89 90 91 92 93 94 95
                return true;
            }
        }

        return false;
    }

R
reiser@cs.fau.de 已提交
96
    /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
97 98 99 100 101
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
        deliverLock.lock();
B
bessani@gmail.com 已提交
102
        //Logger.println("(DeliveryThread.deliverLock) Deliver lock obtained");
103
    }
104

105 106
    public void deliverUnlock() {
        deliverLock.unlock();
B
bessani@gmail.com 已提交
107
        //Logger.println("(DeliveryThread.deliverUnlock) Deliver Released");
108 109 110 111 112
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
113

114 115
    public void update(TransferableState state) {

116
        int lastCheckpointEid = state.getLastCheckpointEid();
117 118
        //int lastEid = state.getLastEid();
        int lastEid = lastCheckpointEid + (state.getMessageBatches() != null ? state.getMessageBatches().length : 0);
119

B
bessani@gmail.com 已提交
120 121
        Logger.println("(DeliveryThread.update) I'm going to update myself from EID "
                + lastCheckpointEid + " to EID " + lastEid);
122

123
        receiver.setState(state.getState());
124

B
bessani@gmail.com 已提交
125 126
        tomLayer.lm.addLeaderInfo(lastCheckpointEid, state.getLastCheckpointRound(),
                state.getLastCheckpointLeader());
127

128
        for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
129
            try {
130
                byte[] batch = state.getMessageBatch(eid).batch; // take a batch
131

B
bessani@gmail.com 已提交
132 133
                tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round,
                        state.getMessageBatch(eid).leader);
134

135
                Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
136

B
bessani@gmail.com 已提交
137 138
                TOMMessage[] requests = new BatchReader(batch,
                        manager.getStaticConf().getUseSignatures() == 1).deserialiseRequests(manager);
139

B
bessani@gmail.com 已提交
140
                tomLayer.clientsManager.requestsOrdered(requests);
141

142
                deliverMessages(eid, tomLayer.getLCManager().getLastReg(), false, requests);
143

B
bessani@gmail.com 已提交
144 145 146
                //******* EDUARDO BEGIN **************//
                if (manager.hasUpdates()) {
                    processReconfigMessages(lastCheckpointEid, state.getLastCheckpointRound());
147
                }
B
bessani@gmail.com 已提交
148
                //******* EDUARDO END **************//
149
            } catch (Exception e) {
B
bessani@gmail.com 已提交
150
                e.printStackTrace(System.err);
151 152 153 154 155 156 157 158 159
                if (e instanceof ArrayIndexOutOfBoundsException) {

                    
                            
                    System.out.println("Eid do ultimo checkpoint: " + state.getLastCheckpointEid());
                    System.out.println("Eid do ultimo consenso: " + state.getLastEid());
                    System.out.println("numero de mensagens supostamente no batch: " + (state.getLastEid() - state.getLastCheckpointEid() + 1));
                    System.out.println("numero de mensagens realmente no batch: " + state.getMessageBatches().length);
                }
160 161 162 163
            }

        }

164 165 166 167 168 169
        //set this consensus as the last executed
        tomLayer.setLastExec(lastEid);

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
        if (lastEid > 2) {
170
            int stableConsensus = lastEid - 3;
171 172

            //tomLayer.lm.removeStableMultipleConsenusInfos(lastCheckpointEid, stableConsensus);
173
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
174 175 176
        }

        //define that end of this execution
177
        //stateManager.setWaiting(-1);
178
        tomLayer.setNoExec();
179

180
        decided.clear();
181 182

        Logger.println("(DeliveryThread.update) All finished from " + lastCheckpointEid + " to " + lastEid);
183 184
    }

P
pjsousa@gmail.com 已提交
185
    /**
B
bessani@gmail.com 已提交
186 187
     * This is the code for the thread. It delivers decided consensus to the TOM
     * request receiver object (which is the application)
P
pjsousa@gmail.com 已提交
188 189 190 191
     */
    @Override
    public void run() {
        while (true) {
R
reiser@cs.fau.de 已提交
192
            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
193
            deliverLock();
194
            while (tomLayer.isRetrievingState()) {
195
                Logger.println("(DeliveryThread.run) Retrieving State.");
196 197
                canDeliver.awaitUninterruptibly();
            }
198
            /******************************************************************/
B
bessani@gmail.com 已提交
199 200 201
            try { //no exception should stop the batch delivery thread
                // take a decided consensus
                Consensus cons = decided.poll(1500, TimeUnit.MILLISECONDS);
202 203
                if (cons == null) {
                    deliverUnlock();
B
bessani@gmail.com 已提交
204
                    continue; //go back to the start of the loop
205
                }
B
bessani@gmail.com 已提交
206
                Logger.println("(DeliveryThread.run) Consensus " + cons.getId() + " was delivered.");
207

B
bessani@gmail.com 已提交
208
                TOMMessage[] requests = extractMessagesFromDecision(cons);
P
pjsousa@gmail.com 已提交
209

B
bessani@gmail.com 已提交
210 211 212
                //cons.firstMessageProposed contains the performance counters
                if (requests[0].equals(cons.firstMessageProposed)) {
                    requests[0] = cons.firstMessageProposed;
213
                }
214

B
bessani@gmail.com 已提交
215 216
                //clean the ordered messages from the pending buffer
                tomLayer.clientsManager.requestsOrdered(requests);
B
bessani@gmail.com 已提交
217

218
                deliverMessages(cons.getId(), tomLayer.getLCManager().getLastReg(), true, requests);
P
pjsousa@gmail.com 已提交
219

B
bessani@gmail.com 已提交
220 221 222 223 224 225 226 227 228
                //******* EDUARDO BEGIN **************//
                if (manager.hasUpdates()) {
                    processReconfigMessages(cons.getId(), cons.getDecisionRound().getNumber());
                    //set this consensus as the last executed
                    tomLayer.setLastExec(cons.getId());
                    //define that end of this execution
                    tomLayer.setInExec(-1);
                }
                //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
229

R
reiser@cs.fau.de 已提交
230
                /** THIS IS JOAO'S CODE, TO HANDLE CHECKPOINTS */
B
bessani@gmail.com 已提交
231 232
                logDecision(cons);
                /********************************************************/
P
pjsousa@gmail.com 已提交
233 234
                //define the last stable consensus... the stable consensus can
                //be removed from the leaderManager and the executionManager
B
bessani@gmail.com 已提交
235
                //TODO: Is this part necessary? If it is, can we put it inside setLastExec
P
pjsousa@gmail.com 已提交
236 237 238 239 240 241 242
                if (cons.getId() > 2) {
                    int stableConsensus = cons.getId() - 3;

                    tomLayer.lm.removeStableConsenusInfos(stableConsensus);
                    tomLayer.execManager.removeExecution(stableConsensus);
                }

B
bessani@gmail.com 已提交
243 244 245
            } catch (Exception e) {
                e.printStackTrace(System.err);
            }
P
pjsousa@gmail.com 已提交
246

R
reiser@cs.fau.de 已提交
247
            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
B
bessani@gmail.com 已提交
248 249 250 251
            deliverUnlock();
            /******************************************************************/
        }
    }
252

B
bessani@gmail.com 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
    private TOMMessage[] extractMessagesFromDecision(Consensus cons) {
        TOMMessage[] requests = (TOMMessage[]) cons.getDeserializedDecision();

        if (requests == null) {
            //there are no cached deserialized requests
            //this may happen if this batch proposal was not verified
            //TODO: this condition is possible?

            Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");

            // obtain an array of requests from the taken consensus
            BatchReader batchReader = new BatchReader(cons.getDecision(),
                    manager.getStaticConf().getUseSignatures() == 1);
            requests = batchReader.deserialiseRequests(manager);
        } else {
            Logger.println("(DeliveryThread.run) using cached requests from the propose.");
        }

        return requests;
    }

274
    public void deliverUnordered(TOMMessage request, int regency) {
B
bessani@gmail.com 已提交
275
        MessageContext msgCtx = new MessageContext(System.currentTimeMillis(),
276
                new byte[0], regency, -1, request.getSender(), null);
277
        receiver.receiveReadonlyMessage(request, msgCtx);
B
bessani@gmail.com 已提交
278 279
    }

280 281
    private void deliverMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests) {
        receiver.receiveMessages(consId, regency, fromConsensus, requests);
B
bessani@gmail.com 已提交
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
    }

    private void processReconfigMessages(int consId, int decisionRoundNumber) {
        byte[] response = manager.executeUpdates(consId, decisionRoundNumber);
        TOMMessage[] dests = manager.clearUpdates();

        for (int i = 0; i < dests.length; i++) {
            tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
                    new TOMMessage(manager.getStaticConf().getProcessId(),
                    dests[i].getSession(), dests[i].getSequence(), response,
                    manager.getCurrentViewId()));
        }

        tomLayer.getCommunication().updateServersConnections();
    }

    private void logDecision(Consensus cons) {
        if (manager.getStaticConf().getCheckpointPeriod() > 0) {
            if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpointPeriod()) == 0)) {
                Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
                byte[] state = receiver.getState();
303
                tomLayer.getStateManager().saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
B
bessani@gmail.com 已提交
304 305 306
                //TODO: possivelmente fazer mais alguma coisa
            } else {
                Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
307
                tomLayer.getStateManager().saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
B
bessani@gmail.com 已提交
308
                //TODO: possivelmente fazer mais alguma coisa
P
pjsousa@gmail.com 已提交
309 310 311 312
            }
        }
    }
}