DeliveryThread.java 13.5 KB
Newer Older
P
pjsousa@gmail.com 已提交
1
/**
2 3 4 5 6 7 8 9 10 11 12 13 14 15
Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
16
package bftsmart.tom.core;
P
pjsousa@gmail.com 已提交
17

18
import java.util.ArrayList;
P
pjsousa@gmail.com 已提交
19 20
import java.util.concurrent.LinkedBlockingQueue;

21
import java.util.concurrent.locks.Condition;
22
import java.util.concurrent.locks.Lock;
23
import java.util.concurrent.locks.ReentrantLock;
24

25
import bftsmart.consensus.Decision;
26
import bftsmart.reconfiguration.ServerViewController;
27
import bftsmart.statemanagement.ApplicationState;
28
import bftsmart.tom.MessageContext;
29
import bftsmart.tom.ServiceReplica;
30 31
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.core.messages.TOMMessageType;
32
import bftsmart.tom.leaderchange.CertifiedDecision;
33 34
import bftsmart.tom.server.Recoverable;
import bftsmart.tom.util.BatchReader;
35 36 37

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
P
pjsousa@gmail.com 已提交
38 39 40 41 42

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
B
bessani@gmail.com 已提交
43
public final class DeliveryThread extends Thread {
P
pjsousa@gmail.com 已提交
44

45 46
    private Logger logger = LoggerFactory.getLogger(this.getClass());

47
    private boolean doWork = true;
48
    private boolean hadReconfig = false;
J
Joao Sousa 已提交
49 50 51 52 53 54 55
    private final LinkedBlockingQueue<Decision> decided; 
    private final TOMLayer tomLayer; // TOM layer
    private final ServiceReplica receiver; // Object that receives requests from clients
    private final Recoverable recoverer; // Object that uses state transfer
    private final ServerViewController controller;
    private final Lock decidedLock = new ReentrantLock();
    private final Condition notEmptyQueue = decidedLock.newCondition();
P
pjsousa@gmail.com 已提交
56 57 58 59 60 61

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
     */
62
    public DeliveryThread(TOMLayer tomLayer, ServiceReplica receiver, Recoverable recoverer, ServerViewController controller) {
P
pjsousa@gmail.com 已提交
63
        super("Delivery Thread");
J
Joao Sousa 已提交
64
        this.decided = new LinkedBlockingQueue<>();
P
pjsousa@gmail.com 已提交
65 66 67

        this.tomLayer = tomLayer;
        this.receiver = receiver;
68
        this.recoverer = recoverer;
69
        //******* EDUARDO BEGIN **************//
70
        this.controller = controller;
71
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
72 73
    }

74 75 76 77 78
    
   public Recoverable getRecoverer() {
        return recoverer;
    }
   
P
pjsousa@gmail.com 已提交
79
    /**
80 81
     * Invoked by the TOM layer, to deliver a decision
     * @param dec Decision established from the consensus
P
pjsousa@gmail.com 已提交
82
     */
83
    public void delivery(Decision dec) {
84
        
P
pjsousa@gmail.com 已提交
85
        try {
J
Joao Sousa 已提交
86
            decidedLock.lock();
87
            decided.put(dec);
88
            
J
Joao Sousa 已提交
89
            // clean the ordered messages from the pending buffer
90
            TOMMessage[] requests = extractMessagesFromDecision(dec);
91
            tomLayer.clientsManager.requestsOrdered(requests);
92
            
93 94
            notEmptyQueue.signalAll();
            decidedLock.unlock();
95
            logger.debug("Consensus " + dec.getConsensusId() + " finished. Decided size=" + decided.size());
P
pjsousa@gmail.com 已提交
96
        } catch (Exception e) {
97
            logger.error("Could not insert decision into decided queue",e);
P
pjsousa@gmail.com 已提交
98
        }
99
        
100 101 102
        hadReconfig = containsReconfig(dec);
        
        if (!hadReconfig) {
103 104 105 106 107 108 109

            logger.debug("Decision from consensus " + dec.getConsensusId() + " does not contain good reconfiguration");
            //set this decision as the last one from this replica
            tomLayer.setLastExec(dec.getConsensusId());
            //define that end of this execution
            tomLayer.setInExec(-1);
        } //else if (tomLayer.controller.getStaticConf().getProcessId() == 0) System.exit(0);
P
pjsousa@gmail.com 已提交
110
    }
B
bessani@gmail.com 已提交
111

112
    private boolean containsReconfig(Decision dec) {
113
        TOMMessage[] decidedMessages = dec.getDeserializedValue();
B
bessani@gmail.com 已提交
114 115

        for (TOMMessage decidedMessage : decidedMessages) {
116
            if (decidedMessage.getReqType() == TOMMessageType.RECONFIG
117
                    && decidedMessage.getViewID() == controller.getCurrentViewId()) {
B
bessani@gmail.com 已提交
118 119 120 121 122 123
                return true;
            }
        }
        return false;
    }

R
reiser@cs.fau.de 已提交
124
    /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
125 126 127 128
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
129
    	// release the delivery lock to avoid blocking on state transfer
130 131 132 133
        decidedLock.lock();
        
        notEmptyQueue.signalAll();
        decidedLock.unlock();
134
    	
135 136
        deliverLock.lock();
    }
137

138 139 140 141 142 143 144
    public void deliverUnlock() {
        deliverLock.unlock();
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
145

146
    public void update(ApplicationState state) {
147
       
148
        int lastCID =  recoverer.setState(state);
149

150
        //set this decision as the last one from this replica
151
        logger.info("Setting last CID to " + lastCID);
152
        tomLayer.setLastExec(lastCID);
153 154 155

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
156 157
        if (lastCID > 2) {
            int stableConsensus = lastCID - 3;
158
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
159 160 161
        }

        //define that end of this execution
162
        //stateManager.setWaiting(-1);
163
        tomLayer.setNoExec();
164

165
        logger.info("Current decided size: " + decided.size());
166
        decided.clear();
167

168
        logger.info("All finished up to " + lastCID);
169 170
    }

P
pjsousa@gmail.com 已提交
171
    /**
172
     * This is the code for the thread. It delivers decisions to the TOM
B
bessani@gmail.com 已提交
173
     * request receiver object (which is the application)
P
pjsousa@gmail.com 已提交
174 175 176
     */
    @Override
    public void run() {
177
        while (doWork) {
178 179 180
            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverLock();
            while (tomLayer.isRetrievingState()) {
181
                logger.info("Retrieving State");
182
                canDeliver.awaitUninterruptibly();
S
snakejerusalem 已提交
183 184
                
                if (tomLayer.getLastExec() == -1)
185
                    logger.info("Ready to process operations");
186 187 188 189 190 191 192
            }
            try {
                ArrayList<Decision> decisions = new ArrayList<Decision>();
                decidedLock.lock();
                if(decided.isEmpty()) {
                    notEmptyQueue.await();
                }
193
                
194 195
                logger.debug("Current size of the decided queue: {}", decided.size());

196 197 198 199 200 201
                if (controller.getStaticConf().getSameBatchSize()) {
                    decided.drainTo(decisions, 1);
                } else {
                    decided.drainTo(decisions);
                }
                
202
                decidedLock.unlock();
203 204 205
                
                if (!doWork) break;
                
206 207 208 209 210
                if (decisions.size() > 0) {
                    TOMMessage[][] requests = new TOMMessage[decisions.size()][];
                    int[] consensusIds = new int[requests.length];
                    int[] leadersIds = new int[requests.length];
                    int[] regenciesIds = new int[requests.length];
211 212
                    CertifiedDecision[] cDecs;
                    cDecs = new CertifiedDecision[requests.length];
213 214 215 216 217 218 219
                    int count = 0;
                    for (Decision d : decisions) {
                        requests[count] = extractMessagesFromDecision(d);
                        consensusIds[count] = d.getConsensusId();
                        leadersIds[count] = d.getLeader();
                        regenciesIds[count] = d.getRegency();

220
                        CertifiedDecision cDec = new CertifiedDecision(this.controller.getStaticConf().getProcessId(),
221
                                d.getConsensusId(), d.getValue(), d.getDecisionEpoch().proof);
222
                        cDecs[count] = cDec;
223 224 225 226

                        // cons.firstMessageProposed contains the performance counters
                        if (requests[count][0].equals(d.firstMessageProposed)) {
                            long time = requests[count][0].timestamp;
227 228
                            long seed = requests[count][0].seed;
                            int numOfNonces = requests[count][0].numOfNonces;
229 230
                            requests[count][0] = d.firstMessageProposed;
                            requests[count][0].timestamp = time;
231 232
                            requests[count][0].seed = seed;
                            requests[count][0].numOfNonces = numOfNonces;
233 234 235 236 237 238 239 240
                        }

                        count++;
                    }

                    Decision lastDecision = decisions.get(decisions.size() - 1);

                    if (requests != null && requests.length > 0) {
241
                        deliverMessages(consensusIds, regenciesIds, leadersIds, cDecs, requests);
242 243 244 245

                        // ******* EDUARDO BEGIN ***********//
                        if (controller.hasUpdates()) {
                            processReconfigMessages(lastDecision.getConsensusId());
246 247 248
                        }
                        if (hadReconfig) {
                            
249 250 251 252 253
                            // set the consensus associated to the last decision as the last executed
                            tomLayer.setLastExec(lastDecision.getConsensusId());
                            // define that end of this execution
                            tomLayer.setInExec(-1);
                            // ******* EDUARDO END **************//
254 255
                            
                            hadReconfig = false;
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
                        }
                    }

                    // define the last stable consensus... the stable consensus can
                    // be removed from the leaderManager and the executionManager
                    // TODO: Is this part necessary? If it is, can we put it
                    // inside setLastExec
                    int cid = lastDecision.getConsensusId();
                    if (cid > 2) {
                        int stableConsensus = cid - 3;

                        tomLayer.execManager.removeConsensus(stableConsensus);
                    }
                }
            } catch (Exception e) {
271
                    logger.error("Error while processing decision",e);
272 273 274 275 276 277
            }

            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverUnlock();
            /******************************************************************/
        }
278
        logger.info("DeliveryThread stopped.");
279

B
bessani@gmail.com 已提交
280
    }
281
    
282 283
    private TOMMessage[] extractMessagesFromDecision(Decision dec) {
    	TOMMessage[] requests = (TOMMessage[]) dec.getDeserializedValue();
284
    	if (requests == null) {
285 286 287
            // there are no cached deserialized requests
            // this may happen if this batch proposal was not verified
            // TODO: this condition is possible?
288

289
            logger.debug("Interpreting and verifying batched requests.");
290

291 292 293 294
            // obtain an array of requests from the decisions obtained
            BatchReader batchReader = new BatchReader(dec.getValue(),
                            controller.getStaticConf().getUseSignatures() == 1);
            requests = batchReader.deserialiseRequests(controller);
295
    	} else {
296
            logger.debug("Using cached requests from the propose.");
297 298 299
    	}

    	return requests;
B
bessani@gmail.com 已提交
300
    }
301
    
302
    protected void deliverUnordered(TOMMessage request, int regency) {
S
snakejerusalem 已提交
303

304 305 306 307 308
        MessageContext msgCtx = new MessageContext(request.getSender(), request.getViewID(), request.getReqType(),
                request.getSession(), request.getSequence(), request.getOperationId(), request.getReplyServer(), request.serializedMessageSignature,
                System.currentTimeMillis(), 0, 0, regency, -1, -1, null, null, false); // Since the request is unordered,
                                                                                       // there is no consensus info to pass
        
309
        msgCtx.readOnly = true;
310
        receiver.receiveReadonlyMessage(request, msgCtx);
B
bessani@gmail.com 已提交
311 312
    }

313 314
    private void deliverMessages(int consId[], int regencies[], int leaders[], CertifiedDecision[] cDecs, TOMMessage[][] requests) {
        receiver.receiveMessages(consId, regencies, leaders, cDecs, requests);
B
bessani@gmail.com 已提交
315 316
    }

317 318
    private void processReconfigMessages(int consId) {
        byte[] response = controller.executeUpdates(consId);
319
        TOMMessage[] dests = controller.clearUpdates();
B
bessani@gmail.com 已提交
320

321 322 323 324
        if (controller.getCurrentView().isMember(receiver.getId())) {
            for (int i = 0; i < dests.length; i++) {
                tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
                        new TOMMessage(controller.getStaticConf().getProcessId(),
325
                        dests[i].getSession(), dests[i].getSequence(), dests[i].getOperationId(), response,
326 327
                        controller.getCurrentViewId(),TOMMessageType.RECONFIG));
            }
B
bessani@gmail.com 已提交
328

329 330 331 332
            tomLayer.getCommunication().updateServersConnections();
        } else {
            receiver.restart();
        }
B
bessani@gmail.com 已提交
333 334
    }

335 336
    public void shutdown() {
        this.doWork = false;
337
        
338
        logger.info("Shutting down delivery thread");
339 340 341 342
        
        decidedLock.lock();        
        notEmptyQueue.signalAll();
        decidedLock.unlock();
343
    }
344 345 346 347
    
    public int size() {
        return decided.size();
    }
P
pjsousa@gmail.com 已提交
348
}