DeliveryThread.java 13.4 KB
Newer Older
P
pjsousa@gmail.com 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/**
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
 * 
 * This file is part of SMaRt.
 * 
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */
package navigators.smart.tom.core;

B
bessani@gmail.com 已提交
20
import navigators.smart.tom.MessageContext;
P
pjsousa@gmail.com 已提交
21 22
import java.util.concurrent.LinkedBlockingQueue;

23
import java.util.concurrent.TimeUnit;
24 25
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
P
pjsousa@gmail.com 已提交
26
import navigators.smart.paxosatwar.Consensus;
27
import navigators.smart.reconfiguration.ReconfigurationManager;
28
import navigators.smart.statemanagment.TransferableState;
P
pjsousa@gmail.com 已提交
29 30
import navigators.smart.tom.TOMRequestReceiver;
import navigators.smart.tom.core.messages.TOMMessage;
B
bessani@gmail.com 已提交
31
import navigators.smart.tom.core.messages.TOMMessageType;
P
pjsousa@gmail.com 已提交
32 33 34 35 36 37 38 39
import navigators.smart.tom.util.BatchReader;
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
B
bessani@gmail.com 已提交
40
public final class DeliveryThread extends Thread {
P
pjsousa@gmail.com 已提交
41 42 43 44

    private LinkedBlockingQueue<Consensus> decided = new LinkedBlockingQueue<Consensus>(); // decided consensus
    private TOMLayer tomLayer; // TOM layer
    private TOMRequestReceiver receiver; // Object that receives requests from clients
45
    private ReconfigurationManager manager;
P
pjsousa@gmail.com 已提交
46 47 48 49 50 51 52

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
     * @param conf TOM configuration
     */
53
    public DeliveryThread(TOMLayer tomLayer, TOMRequestReceiver receiver, ReconfigurationManager manager) {
P
pjsousa@gmail.com 已提交
54 55 56 57
        super("Delivery Thread");

        this.tomLayer = tomLayer;
        this.receiver = receiver;
58 59 60
        //******* EDUARDO BEGIN **************//
        this.manager = manager;
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
61 62 63 64 65 66 67
    }

    /**
     * Invoked by the TOM layer, to deliver a decide consensus
     * @param cons Consensus established as being decided
     */
    public void delivery(Consensus cons) {
B
bessani@gmail.com 已提交
68 69 70 71 72 73 74
        if (!containsReconfig(cons)) {
            //set this consensus as the last executed
            tomLayer.setLastExec(cons.getId());
            //define that end of this execution
            tomLayer.setInExec(-1);
        }

P
pjsousa@gmail.com 已提交
75 76
        try {
            decided.put(cons);
B
bessani@gmail.com 已提交
77
            Logger.println("(DeliveryThread.delivery) Consensus " + cons.getId() + " finished. Decided size=" + decided.size());
P
pjsousa@gmail.com 已提交
78 79 80 81
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
B
bessani@gmail.com 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94

    private boolean containsReconfig(Consensus cons) {
        TOMMessage[] decidedMessages = cons.getDeserializedDecision();

        for (TOMMessage decidedMessage : decidedMessages) {
            if (decidedMessage.getReqType() == TOMMessageType.RECONFIG) {
                return true;
            }
        }

        return false;
    }

95
    /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
96 97 98 99 100
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
        deliverLock.lock();
B
bessani@gmail.com 已提交
101
        //Logger.println("(DeliveryThread.deliverLock) Deliver lock obtained");
102
    }
103

104 105
    public void deliverUnlock() {
        deliverLock.unlock();
B
bessani@gmail.com 已提交
106
        //Logger.println("(DeliveryThread.deliverUnlock) Deliver Released");
107 108 109 110 111
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
112

113 114
    public void update(TransferableState state) {

115 116
        int lastCheckpointEid = state.getLastCheckpointEid();
        int lastEid = state.getLastEid();
117

B
bessani@gmail.com 已提交
118 119
        Logger.println("(DeliveryThread.update) I'm going to update myself from EID "
                + lastCheckpointEid + " to EID " + lastEid);
120

121
        receiver.setState(state.getState());
122

B
bessani@gmail.com 已提交
123 124
        tomLayer.lm.addLeaderInfo(lastCheckpointEid, state.getLastCheckpointRound(),
                state.getLastCheckpointLeader());
125

126
        for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
127
            try {
128
                byte[] batch = state.getMessageBatch(eid).batch; // take a batch
129

B
bessani@gmail.com 已提交
130 131
                tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round,
                        state.getMessageBatch(eid).leader);
132

133
                Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
134

B
bessani@gmail.com 已提交
135 136
                TOMMessage[] requests = new BatchReader(batch,
                        manager.getStaticConf().getUseSignatures() == 1).deserialiseRequests(manager);
137

B
bessani@gmail.com 已提交
138
                tomLayer.clientsManager.requestsOrdered(requests);
139

B
bessani@gmail.com 已提交
140
                deliverMessages(eid, requests);
141

B
bessani@gmail.com 已提交
142 143 144
                //******* EDUARDO BEGIN **************//
                if (manager.hasUpdates()) {
                    processReconfigMessages(lastCheckpointEid, state.getLastCheckpointRound());
145
                }
B
bessani@gmail.com 已提交
146
                //******* EDUARDO END **************//
147
            } catch (Exception e) {
B
bessani@gmail.com 已提交
148
                e.printStackTrace(System.err);
149 150 151 152
            }

        }

153 154 155 156 157 158
        //set this consensus as the last executed
        tomLayer.setLastExec(lastEid);

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
        if (lastEid > 2) {
159
            int stableConsensus = lastEid - 3;
160 161

            //tomLayer.lm.removeStableMultipleConsenusInfos(lastCheckpointEid, stableConsensus);
162
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
163 164 165
        }

        //define that end of this execution
166
        //stateManager.setWaiting(-1);
167
        tomLayer.setNoExec();
168

169
        decided.clear();
170 171

        Logger.println("(DeliveryThread.update) All finished from " + lastCheckpointEid + " to " + lastEid);
172 173
    }

P
pjsousa@gmail.com 已提交
174
    /**
B
bessani@gmail.com 已提交
175 176
     * This is the code for the thread. It delivers decided consensus to the TOM
     * request receiver object (which is the application)
P
pjsousa@gmail.com 已提交
177 178 179 180
     */
    @Override
    public void run() {
        while (true) {
181
            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
182
            deliverLock();
183

184 185 186
            while (tomLayer.isRetrievingState()) {
                canDeliver.awaitUninterruptibly();
            }
187
            /******************************************************************/
B
bessani@gmail.com 已提交
188 189 190
            try { //no exception should stop the batch delivery thread
                // take a decided consensus
                Consensus cons = decided.poll(1500, TimeUnit.MILLISECONDS);
191 192
                if (cons == null) {
                    deliverUnlock();
B
bessani@gmail.com 已提交
193
                    continue; //go back to the start of the loop
194
                }
B
bessani@gmail.com 已提交
195
                Logger.println("(DeliveryThread.run) Consensus " + cons.getId() + " was delivered.");
196

B
bessani@gmail.com 已提交
197
                TOMMessage[] requests = extractMessagesFromDecision(cons);
P
pjsousa@gmail.com 已提交
198

B
bessani@gmail.com 已提交
199 200 201
                //cons.firstMessageProposed contains the performance counters
                if (requests[0].equals(cons.firstMessageProposed)) {
                    requests[0] = cons.firstMessageProposed;
202
                }
203

B
bessani@gmail.com 已提交
204 205
                //clean the ordered messages from the pending buffer
                tomLayer.clientsManager.requestsOrdered(requests);
B
bessani@gmail.com 已提交
206

B
bessani@gmail.com 已提交
207
                deliverMessages(cons.getId(), requests);
P
pjsousa@gmail.com 已提交
208

B
bessani@gmail.com 已提交
209 210 211 212 213 214 215 216 217
                //******* EDUARDO BEGIN **************//
                if (manager.hasUpdates()) {
                    processReconfigMessages(cons.getId(), cons.getDecisionRound().getNumber());
                    //set this consensus as the last executed
                    tomLayer.setLastExec(cons.getId());
                    //define that end of this execution
                    tomLayer.setInExec(-1);
                }
                //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
218

B
bessani@gmail.com 已提交
219 220 221
                /** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
                logDecision(cons);
                /********************************************************/
P
pjsousa@gmail.com 已提交
222 223
                //define the last stable consensus... the stable consensus can
                //be removed from the leaderManager and the executionManager
B
bessani@gmail.com 已提交
224
                //TODO: Is this part necessary? If it is, can we put it inside setLastExec
P
pjsousa@gmail.com 已提交
225 226 227 228 229 230 231
                if (cons.getId() > 2) {
                    int stableConsensus = cons.getId() - 3;

                    tomLayer.lm.removeStableConsenusInfos(stableConsensus);
                    tomLayer.execManager.removeExecution(stableConsensus);
                }

B
bessani@gmail.com 已提交
232 233 234
            } catch (Exception e) {
                e.printStackTrace(System.err);
            }
P
pjsousa@gmail.com 已提交
235

B
bessani@gmail.com 已提交
236 237 238 239 240
            /** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
            deliverUnlock();
            /******************************************************************/
        }
    }
241

B
bessani@gmail.com 已提交
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
    private TOMMessage[] extractMessagesFromDecision(Consensus cons) {
        TOMMessage[] requests = (TOMMessage[]) cons.getDeserializedDecision();

        if (requests == null) {
            //there are no cached deserialized requests
            //this may happen if this batch proposal was not verified
            //TODO: this condition is possible?

            Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");

            // obtain an array of requests from the taken consensus
            BatchReader batchReader = new BatchReader(cons.getDecision(),
                    manager.getStaticConf().getUseSignatures() == 1);
            requests = batchReader.deserialiseRequests(manager);
        } else {
            Logger.println("(DeliveryThread.run) using cached requests from the propose.");
        }

        return requests;
    }

    public void deliverUnordered(TOMMessage request) {
        MessageContext msgCtx = new MessageContext(System.currentTimeMillis(),
                new byte[0], -1, request.getSender(), null);

        receiver.receiveMessage(request, msgCtx);
    }

    private void deliverMessages(int consId, TOMMessage[] requests) {
        TOMMessage firstRequest = requests[0];
        
        for (TOMMessage request: requests) {
            if (request.getViewID() == manager.getCurrentViewId()) {
                if (request.getReqType() == TOMMessageType.REQUEST) {
                    //normal request execution
277
                    
B
bessani@gmail.com 已提交
278 279 280
                    //create a context for the batch of messages to be delivered
                    MessageContext msgCtx = new MessageContext(firstRequest.timestamp, 
                            firstRequest.nonces, consId, request.getSender(), firstRequest);
281

B
bessani@gmail.com 已提交
282 283 284 285 286 287 288 289
                    request.deliveryTime = System.nanoTime();

                    receiver.receiveOrderedMessage(request, msgCtx);
                } else if (request.getReqType() == TOMMessageType.RECONFIG) {
                    //Reconfiguration request to be processed after the batch
                    manager.enqueueUpdate(request);
                } else {
                    throw new RuntimeException("Should never reach here!");
P
pjsousa@gmail.com 已提交
290
                }
B
bessani@gmail.com 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
            } else {
                //message sender had an old view, resend the message to him
                tomLayer.getCommunication().send(new int[]{request.getSender()},
                        new TOMMessage(manager.getStaticConf().getProcessId(),
                        request.getSession(), request.getSequence(),
                        TOMUtil.getBytes(manager.getCurrentView()), manager.getCurrentViewId()));
            }
        }
    }

    private void processReconfigMessages(int consId, int decisionRoundNumber) {
        receiver.waitForProcessingRequests();

        byte[] response = manager.executeUpdates(consId, decisionRoundNumber);
        TOMMessage[] dests = manager.clearUpdates();

        for (int i = 0; i < dests.length; i++) {
            tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
                    new TOMMessage(manager.getStaticConf().getProcessId(),
                    dests[i].getSession(), dests[i].getSequence(), response,
                    manager.getCurrentViewId()));
        }

        tomLayer.getCommunication().updateServersConnections();
    }

    private void logDecision(Consensus cons) {
        if (manager.getStaticConf().getCheckpointPeriod() > 0) {
            if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpointPeriod()) == 0)) {
                Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
                byte[] state = receiver.getState();
                tomLayer.saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
                //TODO: possivelmente fazer mais alguma coisa
            } else {
                Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
                tomLayer.saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
                //TODO: possivelmente fazer mais alguma coisa
P
pjsousa@gmail.com 已提交
328 329 330 331
            }
        }
    }
}