DeliveryThread.java 14.2 KB
Newer Older
P
pjsousa@gmail.com 已提交
1
/**
2 3 4 5 6 7 8 9 10 11 12 13 14 15
Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
16
package bftsmart.tom.core;
P
pjsousa@gmail.com 已提交
17

18
import java.util.ArrayList;
P
pjsousa@gmail.com 已提交
19 20
import java.util.concurrent.LinkedBlockingQueue;

21
import java.util.concurrent.locks.Condition;
22
import java.util.concurrent.locks.Lock;
23
import java.util.concurrent.locks.ReentrantLock;
24

25
import bftsmart.consensus.Decision;
26
import bftsmart.reconfiguration.ServerViewController;
27
import bftsmart.statemanagement.ApplicationState;
28
import bftsmart.tom.MessageContext;
29
import bftsmart.tom.ServiceReplica;
30 31
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.core.messages.TOMMessageType;
32
import bftsmart.tom.leaderchange.CertifiedDecision;
33 34
import bftsmart.tom.server.Recoverable;
import bftsmart.tom.util.BatchReader;
35 36 37

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
P
pjsousa@gmail.com 已提交
38 39 40 41 42

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
B
bessani@gmail.com 已提交
43
public final class DeliveryThread extends Thread {
P
pjsousa@gmail.com 已提交
44

45 46
    private Logger logger = LoggerFactory.getLogger(this.getClass());

47
    private boolean doWork = true;
48
    private int lastReconfig = -2;
49
    private int currentDecisions = 0;
J
Joao Sousa 已提交
50 51 52 53 54 55 56
    private final LinkedBlockingQueue<Decision> decided; 
    private final TOMLayer tomLayer; // TOM layer
    private final ServiceReplica receiver; // Object that receives requests from clients
    private final Recoverable recoverer; // Object that uses state transfer
    private final ServerViewController controller;
    private final Lock decidedLock = new ReentrantLock();
    private final Condition notEmptyQueue = decidedLock.newCondition();
P
pjsousa@gmail.com 已提交
57 58 59 60 61 62

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
     */
63
    public DeliveryThread(TOMLayer tomLayer, ServiceReplica receiver, Recoverable recoverer, ServerViewController controller) {
P
pjsousa@gmail.com 已提交
64
        super("Delivery Thread");
J
Joao Sousa 已提交
65
        this.decided = new LinkedBlockingQueue<>();
P
pjsousa@gmail.com 已提交
66 67 68

        this.tomLayer = tomLayer;
        this.receiver = receiver;
69
        this.recoverer = recoverer;
70
        //******* EDUARDO BEGIN **************//
71
        this.controller = controller;
72
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
73 74
    }

75 76 77 78 79
    
   public Recoverable getRecoverer() {
        return recoverer;
    }
   
80 81
   public int getPendingDecisions() {
       return decided.size() + currentDecisions;
82 83
   }
   
84 85 86
   public ReplyManager getReplyManager() {
       return receiver.getReplyManager();
   }
P
pjsousa@gmail.com 已提交
87
    /**
88 89
     * Invoked by the TOM layer, to deliver a decision
     * @param dec Decision established from the consensus
P
pjsousa@gmail.com 已提交
90
     */
91
    public void delivery(Decision dec) {
92
        
P
pjsousa@gmail.com 已提交
93
        try {
J
Joao Sousa 已提交
94
            decidedLock.lock();
95
            decided.put(dec);
96
            
J
Joao Sousa 已提交
97
            // clean the ordered messages from the pending buffer
98
            TOMMessage[] requests = extractMessagesFromDecision(dec);
99
            tomLayer.clientsManager.requestsOrdered(requests);
100
            
101 102
            notEmptyQueue.signalAll();
            decidedLock.unlock();
103
            logger.debug("Consensus " + dec.getConsensusId() + " finished. Decided size=" + decided.size());
P
pjsousa@gmail.com 已提交
104
        } catch (Exception e) {
105
            logger.error("Could not insert decision into decided queue",e);
P
pjsousa@gmail.com 已提交
106
        }
107
        
108
        if (!containsReconfig(dec)) {
109

110
            logger.debug("Decision from consensus " + dec.getConsensusId() + " does not contain reconfiguration");
111 112 113 114 115
            //set this decision as the last one from this replica
            tomLayer.setLastExec(dec.getConsensusId());
            //define that end of this execution
            tomLayer.setInExec(-1);
        } //else if (tomLayer.controller.getStaticConf().getProcessId() == 0) System.exit(0);
116 117 118 119
        else {
            logger.debug("Decision from consensus " + dec.getConsensusId() + " has reconfiguration");
            lastReconfig = dec.getConsensusId();
        }
P
pjsousa@gmail.com 已提交
120
    }
B
bessani@gmail.com 已提交
121

122
    private boolean containsReconfig(Decision dec) {
123
        TOMMessage[] decidedMessages = dec.getDeserializedValue();
B
bessani@gmail.com 已提交
124 125

        for (TOMMessage decidedMessage : decidedMessages) {
126
            if (decidedMessage.getReqType() == TOMMessageType.RECONFIG
127
                    && decidedMessage.getViewID() == controller.getCurrentViewId()) {
B
bessani@gmail.com 已提交
128 129 130 131 132 133
                return true;
            }
        }
        return false;
    }

R
reiser@cs.fau.de 已提交
134
    /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
135 136 137 138
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
139
    	// release the delivery lock to avoid blocking on state transfer
140 141 142 143
        decidedLock.lock();
        
        notEmptyQueue.signalAll();
        decidedLock.unlock();
144
    	
145 146
        deliverLock.lock();
    }
147

148 149 150 151 152 153 154
    public void deliverUnlock() {
        deliverLock.unlock();
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
155

156
    public void update(ApplicationState state) {
157
       
158
        int lastCID =  recoverer.setState(state);
159

160
        //set this decision as the last one from this replica
161
        logger.info("Setting last CID to " + lastCID);
162
        tomLayer.setLastExec(lastCID);
163 164 165

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
166 167
        if (lastCID > 2) {
            int stableConsensus = lastCID - 3;
168
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
169 170 171
        }

        //define that end of this execution
172
        //stateManager.setWaiting(-1);
173
        tomLayer.setNoExec();
174

175
        logger.info("Current decided size: " + decided.size());
176
        decided.clear();
177

178
        logger.info("All finished up to " + lastCID);
179 180
    }

P
pjsousa@gmail.com 已提交
181
    /**
182
     * This is the code for the thread. It delivers decisions to the TOM
B
bessani@gmail.com 已提交
183
     * request receiver object (which is the application)
P
pjsousa@gmail.com 已提交
184 185 186
     */
    @Override
    public void run() {
187
        while (doWork) {
188 189 190
            
            currentDecisions = 0;
            
191 192 193
            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverLock();
            while (tomLayer.isRetrievingState()) {
194
                logger.info("Retrieving State");
195
                canDeliver.awaitUninterruptibly();
S
snakejerusalem 已提交
196 197
                
                if (tomLayer.getLastExec() == -1)
198
                    logger.info("Ready to process operations");
199 200 201 202 203 204 205
            }
            try {
                ArrayList<Decision> decisions = new ArrayList<Decision>();
                decidedLock.lock();
                if(decided.isEmpty()) {
                    notEmptyQueue.await();
                }
206
                
207 208
                logger.debug("Current size of the decided queue: {}", decided.size());

209 210 211 212 213 214
                if (controller.getStaticConf().getSameBatchSize()) {
                    decided.drainTo(decisions, 1);
                } else {
                    decided.drainTo(decisions);
                }
                
215
                decidedLock.unlock();
216 217 218
                
                if (!doWork) break;
                
219
                if (decisions.size() > 0) {
220 221 222
                    
                    currentDecisions = decisions.size();
                    
223 224 225 226
                    TOMMessage[][] requests = new TOMMessage[decisions.size()][];
                    int[] consensusIds = new int[requests.length];
                    int[] leadersIds = new int[requests.length];
                    int[] regenciesIds = new int[requests.length];
227 228
                    CertifiedDecision[] cDecs;
                    cDecs = new CertifiedDecision[requests.length];
229 230 231 232 233 234 235
                    int count = 0;
                    for (Decision d : decisions) {
                        requests[count] = extractMessagesFromDecision(d);
                        consensusIds[count] = d.getConsensusId();
                        leadersIds[count] = d.getLeader();
                        regenciesIds[count] = d.getRegency();

236
                        CertifiedDecision cDec = new CertifiedDecision(this.controller.getStaticConf().getProcessId(),
237
                                d.getConsensusId(), d.getValue(), d.getDecisionEpoch().proof);
238
                        cDecs[count] = cDec;
239 240

                        // cons.firstMessageProposed contains the performance counters
241
                        if (requests[count].length > 0 && requests[count][0].equals(d.firstMessageProposed)) {
242
                            long time = requests[count][0].timestamp;
243 244
                            long seed = requests[count][0].seed;
                            int numOfNonces = requests[count][0].numOfNonces;
245 246
                            requests[count][0] = d.firstMessageProposed;
                            requests[count][0].timestamp = time;
247 248
                            requests[count][0].seed = seed;
                            requests[count][0].numOfNonces = numOfNonces;
249 250 251 252 253 254 255 256
                        }

                        count++;
                    }

                    Decision lastDecision = decisions.get(decisions.size() - 1);

                    if (requests != null && requests.length > 0) {
257
                        deliverMessages(consensusIds, regenciesIds, leadersIds, cDecs, requests);
258 259 260 261

                        // ******* EDUARDO BEGIN ***********//
                        if (controller.hasUpdates()) {
                            processReconfigMessages(lastDecision.getConsensusId());
262
                        }
263
                        if (lastReconfig > -2 && lastReconfig <= lastDecision.getConsensusId()) {
264
                            
265
                            // set the consensus associated to the last decision as the last executed
266
                            logger.debug("Setting last executed consensus to " + lastDecision.getConsensusId());
267 268 269 270
                            tomLayer.setLastExec(lastDecision.getConsensusId());
                            // define that end of this execution
                            tomLayer.setInExec(-1);
                            // ******* EDUARDO END **************//
271
                            
272
                            lastReconfig = -2;
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
                        }
                    }

                    // define the last stable consensus... the stable consensus can
                    // be removed from the leaderManager and the executionManager
                    // TODO: Is this part necessary? If it is, can we put it
                    // inside setLastExec
                    int cid = lastDecision.getConsensusId();
                    if (cid > 2) {
                        int stableConsensus = cid - 3;

                        tomLayer.execManager.removeConsensus(stableConsensus);
                    }
                }
            } catch (Exception e) {
288
                    logger.error("Error while processing decision",e);
289 290 291 292 293 294
            }

            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverUnlock();
            /******************************************************************/
        }
295
        logger.info("DeliveryThread stopped.");
296

B
bessani@gmail.com 已提交
297
    }
298
    
299 300
    private TOMMessage[] extractMessagesFromDecision(Decision dec) {
    	TOMMessage[] requests = (TOMMessage[]) dec.getDeserializedValue();
301
    	if (requests == null) {
302 303 304
            // there are no cached deserialized requests
            // this may happen if this batch proposal was not verified
            // TODO: this condition is possible?
305

306
            logger.debug("Interpreting and verifying batched requests.");
307

308 309 310 311
            // obtain an array of requests from the decisions obtained
            BatchReader batchReader = new BatchReader(dec.getValue(),
                            controller.getStaticConf().getUseSignatures() == 1);
            requests = batchReader.deserialiseRequests(controller);
312
    	} else {
313
            logger.debug("Using cached requests from the propose.");
314 315 316
    	}

    	return requests;
B
bessani@gmail.com 已提交
317
    }
318
    
319
    protected void deliverUnordered(TOMMessage request, int regency) {
S
snakejerusalem 已提交
320

321
        MessageContext msgCtx = new MessageContext(request.getSender(), request.getViewID(), request.getReqType(),
322
                request.getSession(), request.getSequence(), request.getOperationId(), request.getReplyServer(), request.getAckSeq(), request.serializedMessageSignature,
323 324 325
                System.currentTimeMillis(), 0, 0, regency, -1, -1, null, null, false); // Since the request is unordered,
                                                                                       // there is no consensus info to pass
        
326
        msgCtx.readOnly = true;
327
        receiver.receiveReadonlyMessage(request, msgCtx);
B
bessani@gmail.com 已提交
328 329
    }

330 331
    private void deliverMessages(int consId[], int regencies[], int leaders[], CertifiedDecision[] cDecs, TOMMessage[][] requests) {
        receiver.receiveMessages(consId, regencies, leaders, cDecs, requests);
B
bessani@gmail.com 已提交
332 333
    }

334 335
    private void processReconfigMessages(int consId) {
        byte[] response = controller.executeUpdates(consId);
336
        TOMMessage[] dests = controller.clearUpdates();
B
bessani@gmail.com 已提交
337

338 339 340 341
        if (controller.getCurrentView().isMember(receiver.getId())) {
            for (int i = 0; i < dests.length; i++) {
                tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
                        new TOMMessage(controller.getStaticConf().getProcessId(),
342
                        dests[i].getSession(), dests[i].getSequence(), dests[i].getOperationId(), response,
343 344
                        controller.getCurrentViewId(),TOMMessageType.RECONFIG));
            }
B
bessani@gmail.com 已提交
345

346 347 348 349
            tomLayer.getCommunication().updateServersConnections();
        } else {
            receiver.restart();
        }
B
bessani@gmail.com 已提交
350 351
    }

352 353
    public void shutdown() {
        this.doWork = false;
354
        
355
        logger.info("Shutting down delivery thread");
356 357 358 359
        
        decidedLock.lock();        
        notEmptyQueue.signalAll();
        decidedLock.unlock();
360
    }
361 362 363 364
    
    public int size() {
        return decided.size();
    }
P
pjsousa@gmail.com 已提交
365
}