DeliveryThread.java 13.0 KB
Newer Older
P
pjsousa@gmail.com 已提交
1
/**
2 3 4 5 6 7 8 9 10 11 12 13 14 15
Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
16
package bftsmart.tom.core;
P
pjsousa@gmail.com 已提交
17

18
import java.util.ArrayList;
P
pjsousa@gmail.com 已提交
19 20
import java.util.concurrent.LinkedBlockingQueue;

21
import java.util.concurrent.locks.Condition;
22
import java.util.concurrent.locks.Lock;
23
import java.util.concurrent.locks.ReentrantLock;
24
import java.util.logging.Level;
25

26
import bftsmart.consensus.Decision;
27
import bftsmart.reconfiguration.ServerViewController;
28
import bftsmart.statemanagement.ApplicationState;
29
import bftsmart.tom.MessageContext;
30
import bftsmart.tom.ServiceReplica;
31 32
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.core.messages.TOMMessageType;
33
import bftsmart.tom.leaderchange.CertifiedDecision;
34 35 36
import bftsmart.tom.server.Recoverable;
import bftsmart.tom.util.BatchReader;
import bftsmart.tom.util.Logger;
P
pjsousa@gmail.com 已提交
37 38 39 40 41

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
B
bessani@gmail.com 已提交
42
public final class DeliveryThread extends Thread {
P
pjsousa@gmail.com 已提交
43

44
    private boolean doWork = true;
J
Joao Sousa 已提交
45 46 47 48 49 50 51
    private final LinkedBlockingQueue<Decision> decided; 
    private final TOMLayer tomLayer; // TOM layer
    private final ServiceReplica receiver; // Object that receives requests from clients
    private final Recoverable recoverer; // Object that uses state transfer
    private final ServerViewController controller;
    private final Lock decidedLock = new ReentrantLock();
    private final Condition notEmptyQueue = decidedLock.newCondition();
P
pjsousa@gmail.com 已提交
52 53 54 55 56 57

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
     */
58
    public DeliveryThread(TOMLayer tomLayer, ServiceReplica receiver, Recoverable recoverer, ServerViewController controller) {
P
pjsousa@gmail.com 已提交
59
        super("Delivery Thread");
J
Joao Sousa 已提交
60
        this.decided = new LinkedBlockingQueue<>();
P
pjsousa@gmail.com 已提交
61 62 63

        this.tomLayer = tomLayer;
        this.receiver = receiver;
64
        this.recoverer = recoverer;
65
        //******* EDUARDO BEGIN **************//
66
        this.controller = controller;
67
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
68 69
    }

70 71 72 73 74
    
   public Recoverable getRecoverer() {
        return recoverer;
    }
   
P
pjsousa@gmail.com 已提交
75
    /**
76 77
     * Invoked by the TOM layer, to deliver a decision
     * @param dec Decision established from the consensus
P
pjsousa@gmail.com 已提交
78
     */
79 80
    public void delivery(Decision dec) {
        if (!containsGoodReconfig(dec)) {
81

82 83 84
            Logger.println("(DeliveryThread.delivery) Decision from consensus " + dec.getConsensusId() + " does not contain good reconfiguration");
            //set this decision as the last one from this replica
            tomLayer.setLastExec(dec.getConsensusId());
B
bessani@gmail.com 已提交
85 86
            //define that end of this execution
            tomLayer.setInExec(-1);
87
        } //else if (tomLayer.controller.getStaticConf().getProcessId() == 0) System.exit(0);
P
pjsousa@gmail.com 已提交
88
        try {
J
Joao Sousa 已提交
89
            decidedLock.lock();
90
            decided.put(dec);
91
            
J
Joao Sousa 已提交
92
            // clean the ordered messages from the pending buffer
93
            TOMMessage[] requests = extractMessagesFromDecision(dec);
94
            tomLayer.clientsManager.requestsOrdered(requests);
95
            
96 97
            notEmptyQueue.signalAll();
            decidedLock.unlock();
98
            Logger.println("(DeliveryThread.delivery) Consensus " + dec.getConsensusId() + " finished. Decided size=" + decided.size());
P
pjsousa@gmail.com 已提交
99 100 101 102
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
B
bessani@gmail.com 已提交
103

104 105
    private boolean containsGoodReconfig(Decision dec) {
        TOMMessage[] decidedMessages = dec.getDeserializedValue();
B
bessani@gmail.com 已提交
106 107

        for (TOMMessage decidedMessage : decidedMessages) {
108
            if (decidedMessage.getReqType() == TOMMessageType.RECONFIG
109
                    && decidedMessage.getViewID() == controller.getCurrentViewId()) {
B
bessani@gmail.com 已提交
110 111 112 113 114 115
                return true;
            }
        }
        return false;
    }

R
reiser@cs.fau.de 已提交
116
    /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
117 118 119 120
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
121
    	// release the delivery lock to avoid blocking on state transfer
122 123 124 125
        decidedLock.lock();
        
        notEmptyQueue.signalAll();
        decidedLock.unlock();
126
    	
127 128
        deliverLock.lock();
    }
129

130 131 132 133 134 135 136
    public void deliverUnlock() {
        deliverLock.unlock();
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
137

138
    public void update(ApplicationState state) {
139
       
140
        int lastCID =  recoverer.setState(state);
141

142
        //set this decision as the last one from this replica
143 144
        System.out.println("Setting last CID to " + lastCID);
        tomLayer.setLastExec(lastCID);
145 146 147

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
148 149
        if (lastCID > 2) {
            int stableConsensus = lastCID - 3;
150
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
151 152 153
        }

        //define that end of this execution
154
        //stateManager.setWaiting(-1);
155
        tomLayer.setNoExec();
156

157
        System.out.print("Current decided size: " + decided.size());
158
        decided.clear();
159

160
        System.out.println("(DeliveryThread.update) All finished up to " + lastCID);
161 162
    }

P
pjsousa@gmail.com 已提交
163
    /**
164
     * This is the code for the thread. It delivers decisions to the TOM
B
bessani@gmail.com 已提交
165
     * request receiver object (which is the application)
P
pjsousa@gmail.com 已提交
166 167 168
     */
    @Override
    public void run() {
169
        while (doWork) {
170 171 172
            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverLock();
            while (tomLayer.isRetrievingState()) {
173
                System.out.println("-- Retrieving State");
174
                canDeliver.awaitUninterruptibly();
S
snakejerusalem 已提交
175 176
                
                if (tomLayer.getLastExec() == -1)
177
                    System.out.println("-- Ready to process operations");
178 179 180 181 182 183 184 185 186
            }
            try {
                ArrayList<Decision> decisions = new ArrayList<Decision>();
                decidedLock.lock();
                if(decided.isEmpty()) {
                    notEmptyQueue.await();
                }
                decided.drainTo(decisions);
                decidedLock.unlock();
187 188 189
                
                if (!doWork) break;
                
190 191 192 193 194
                if (decisions.size() > 0) {
                    TOMMessage[][] requests = new TOMMessage[decisions.size()][];
                    int[] consensusIds = new int[requests.length];
                    int[] leadersIds = new int[requests.length];
                    int[] regenciesIds = new int[requests.length];
195 196
                    CertifiedDecision[] cDecs;
                    cDecs = new CertifiedDecision[requests.length];
197 198 199 200 201 202 203
                    int count = 0;
                    for (Decision d : decisions) {
                        requests[count] = extractMessagesFromDecision(d);
                        consensusIds[count] = d.getConsensusId();
                        leadersIds[count] = d.getLeader();
                        regenciesIds[count] = d.getRegency();

204
                        CertifiedDecision cDec = new CertifiedDecision(this.controller.getStaticConf().getProcessId(),
205
                                d.getConsensusId(), d.getValue(), d.getDecisionEpoch().proof);
206
                        cDecs[count] = cDec;
207 208 209 210

                        // cons.firstMessageProposed contains the performance counters
                        if (requests[count][0].equals(d.firstMessageProposed)) {
                            long time = requests[count][0].timestamp;
211 212
                            long seed = requests[count][0].seed;
                            int numOfNonces = requests[count][0].numOfNonces;
213 214
                            requests[count][0] = d.firstMessageProposed;
                            requests[count][0].timestamp = time;
215 216
                            requests[count][0].seed = seed;
                            requests[count][0].numOfNonces = numOfNonces;
217 218 219 220 221 222 223 224
                        }

                        count++;
                    }

                    Decision lastDecision = decisions.get(decisions.size() - 1);

                    if (requests != null && requests.length > 0) {
225
                        deliverMessages(consensusIds, regenciesIds, leadersIds, cDecs, requests);
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257

                        // ******* EDUARDO BEGIN ***********//
                        if (controller.hasUpdates()) {
                            processReconfigMessages(lastDecision.getConsensusId());

                            // set the consensus associated to the last decision as the last executed
                            tomLayer.setLastExec(lastDecision.getConsensusId());
                            // define that end of this execution
                            tomLayer.setInExec(-1);
                            // ******* EDUARDO END **************//
                        }
                    }

                    // define the last stable consensus... the stable consensus can
                    // be removed from the leaderManager and the executionManager
                    // TODO: Is this part necessary? If it is, can we put it
                    // inside setLastExec
                    int cid = lastDecision.getConsensusId();
                    if (cid > 2) {
                        int stableConsensus = cid - 3;

                        tomLayer.execManager.removeConsensus(stableConsensus);
                    }
                }
            } catch (Exception e) {
                    e.printStackTrace(System.err);
            }

            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverUnlock();
            /******************************************************************/
        }
258 259
        java.util.logging.Logger.getLogger(DeliveryThread.class.getName()).log(Level.INFO, "DeliveryThread stopped.");

B
bessani@gmail.com 已提交
260
    }
261
    
262 263
    private TOMMessage[] extractMessagesFromDecision(Decision dec) {
    	TOMMessage[] requests = (TOMMessage[]) dec.getDeserializedValue();
264
    	if (requests == null) {
265 266 267
            // there are no cached deserialized requests
            // this may happen if this batch proposal was not verified
            // TODO: this condition is possible?
268

269
            Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");
270

271 272 273 274
            // obtain an array of requests from the decisions obtained
            BatchReader batchReader = new BatchReader(dec.getValue(),
                            controller.getStaticConf().getUseSignatures() == 1);
            requests = batchReader.deserialiseRequests(controller);
275
    	} else {
276
            Logger.println("(DeliveryThread.run) using cached requests from the propose.");
277 278 279
    	}

    	return requests;
B
bessani@gmail.com 已提交
280
    }
281
    
282
    protected void deliverUnordered(TOMMessage request, int regency) {
S
snakejerusalem 已提交
283

284 285 286 287 288
        MessageContext msgCtx = new MessageContext(request.getSender(), request.getViewID(), request.getReqType(),
                request.getSession(), request.getSequence(), request.getOperationId(), request.getReplyServer(), request.serializedMessageSignature,
                System.currentTimeMillis(), 0, 0, regency, -1, -1, null, null, false); // Since the request is unordered,
                                                                                       // there is no consensus info to pass
        
289
        msgCtx.readOnly = true;
290
        receiver.receiveReadonlyMessage(request, msgCtx);
B
bessani@gmail.com 已提交
291 292
    }

293 294
    private void deliverMessages(int consId[], int regencies[], int leaders[], CertifiedDecision[] cDecs, TOMMessage[][] requests) {
        receiver.receiveMessages(consId, regencies, leaders, cDecs, requests);
B
bessani@gmail.com 已提交
295 296
    }

297 298
    private void processReconfigMessages(int consId) {
        byte[] response = controller.executeUpdates(consId);
299
        TOMMessage[] dests = controller.clearUpdates();
B
bessani@gmail.com 已提交
300

301 302 303 304
        if (controller.getCurrentView().isMember(receiver.getId())) {
            for (int i = 0; i < dests.length; i++) {
                tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
                        new TOMMessage(controller.getStaticConf().getProcessId(),
305
                        dests[i].getSession(), dests[i].getSequence(), dests[i].getOperationId(), response,
306 307
                        controller.getCurrentViewId(),TOMMessageType.RECONFIG));
            }
B
bessani@gmail.com 已提交
308

309 310 311 312
            tomLayer.getCommunication().updateServersConnections();
        } else {
            receiver.restart();
        }
B
bessani@gmail.com 已提交
313 314
    }

315 316
    public void shutdown() {
        this.doWork = false;
317 318 319 320 321 322
        
        System.out.println("Shutting down delivery thread");
        
        decidedLock.lock();        
        notEmptyQueue.signalAll();
        decidedLock.unlock();
323
    }
P
pjsousa@gmail.com 已提交
324
}