DeliveryThread.java 12.1 KB
Newer Older
P
pjsousa@gmail.com 已提交
1
/**
2 3 4 5 6 7 8 9 10 11 12 13 14 15
Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
16
package bftsmart.tom.core;
P
pjsousa@gmail.com 已提交
17

18
import java.util.ArrayList;
P
pjsousa@gmail.com 已提交
19 20
import java.util.concurrent.LinkedBlockingQueue;

21
import java.util.concurrent.locks.Condition;
22
import java.util.concurrent.locks.Lock;
23
import java.util.concurrent.locks.ReentrantLock;
24

25
import bftsmart.consensus.Decision;
26
import bftsmart.reconfiguration.ServerViewController;
27
import bftsmart.statemanagement.ApplicationState;
28
import bftsmart.tom.MessageContext;
29
import bftsmart.tom.ServiceReplica;
30 31
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.core.messages.TOMMessageType;
32
import bftsmart.tom.leaderchange.CertifiedDecision;
33 34 35
import bftsmart.tom.server.Recoverable;
import bftsmart.tom.util.BatchReader;
import bftsmart.tom.util.Logger;
P
pjsousa@gmail.com 已提交
36 37 38 39 40

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
B
bessani@gmail.com 已提交
41
public final class DeliveryThread extends Thread {
P
pjsousa@gmail.com 已提交
42

43
    private LinkedBlockingQueue<Decision> decided = new LinkedBlockingQueue<>(); // decided from consensus
P
pjsousa@gmail.com 已提交
44
    private TOMLayer tomLayer; // TOM layer
45
    private ServiceReplica receiver; // Object that receives requests from clients
46
    private Recoverable recoverer; // Object that uses state transfer
47
    private ServerViewController controller;
48 49
    private Lock decidedLock = new ReentrantLock();
    private Condition notEmptyQueue = decidedLock.newCondition();
P
pjsousa@gmail.com 已提交
50 51 52 53 54

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
55
     * @param conf TOM configuration
P
pjsousa@gmail.com 已提交
56
     */
57
    public DeliveryThread(TOMLayer tomLayer, ServiceReplica receiver, Recoverable recoverer, ServerViewController controller) {
P
pjsousa@gmail.com 已提交
58 59 60 61
        super("Delivery Thread");

        this.tomLayer = tomLayer;
        this.receiver = receiver;
62
        this.recoverer = recoverer;
63
        //******* EDUARDO BEGIN **************//
64
        this.controller = controller;
65
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
66 67
    }

68 69 70 71 72
    
   public Recoverable getRecoverer() {
        return recoverer;
    }
   
P
pjsousa@gmail.com 已提交
73
    /**
74 75
     * Invoked by the TOM layer, to deliver a decision
     * @param dec Decision established from the consensus
P
pjsousa@gmail.com 已提交
76
     */
77 78
    public void delivery(Decision dec) {
        if (!containsGoodReconfig(dec)) {
79

80 81 82
            Logger.println("(DeliveryThread.delivery) Decision from consensus " + dec.getConsensusId() + " does not contain good reconfiguration");
            //set this decision as the last one from this replica
            tomLayer.setLastExec(dec.getConsensusId());
B
bessani@gmail.com 已提交
83 84
            //define that end of this execution
            tomLayer.setInExec(-1);
85
        } //else if (tomLayer.controller.getStaticConf().getProcessId() == 0) System.exit(0);
P
pjsousa@gmail.com 已提交
86
        try {
87
        	decidedLock.lock();
88
            decided.put(dec);
89 90
            
			// clean the ordered messages from the pending buffer
91
            TOMMessage[] requests = extractMessagesFromDecision(dec);
92 93
			tomLayer.clientsManager.requestsOrdered(requests);
            
94 95
            notEmptyQueue.signalAll();
            decidedLock.unlock();
96
            Logger.println("(DeliveryThread.delivery) Consensus " + dec.getConsensusId() + " finished. Decided size=" + decided.size());
P
pjsousa@gmail.com 已提交
97 98 99 100
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
B
bessani@gmail.com 已提交
101

102 103
    private boolean containsGoodReconfig(Decision dec) {
        TOMMessage[] decidedMessages = dec.getDeserializedValue();
B
bessani@gmail.com 已提交
104 105

        for (TOMMessage decidedMessage : decidedMessages) {
106
            if (decidedMessage.getReqType() == TOMMessageType.RECONFIG
107
                    && decidedMessage.getViewID() == controller.getCurrentViewId()) {
B
bessani@gmail.com 已提交
108 109 110 111 112 113
                return true;
            }
        }
        return false;
    }

R
reiser@cs.fau.de 已提交
114
    /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
115 116 117 118
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
119
    	// release the delivery lock to avoid blocking on state transfer
120 121 122 123
        decidedLock.lock();
        
        notEmptyQueue.signalAll();
        decidedLock.unlock();
124
    	
125 126
        deliverLock.lock();
    }
127

128 129 130 131 132 133 134
    public void deliverUnlock() {
        deliverLock.unlock();
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
135

136
    public void update(ApplicationState state) {
137
       
138
        int lastCID =  recoverer.setState(state);
139

140
        //set this decision as the last one from this replica
141 142
        System.out.println("Setting last CID to " + lastCID);
        tomLayer.setLastExec(lastCID);
143 144 145

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
146 147
        if (lastCID > 2) {
            int stableConsensus = lastCID - 3;
148
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
149 150 151
        }

        //define that end of this execution
152
        //stateManager.setWaiting(-1);
153
        tomLayer.setNoExec();
154

155
        System.out.print("Current decided size: " + decided.size());
156
        decided.clear();
157

158
        System.out.println("(DeliveryThread.update) All finished up to " + lastCID);
159 160
    }

P
pjsousa@gmail.com 已提交
161
    /**
162
     * This is the code for the thread. It delivers decisions to the TOM
B
bessani@gmail.com 已提交
163
     * request receiver object (which is the application)
P
pjsousa@gmail.com 已提交
164 165 166 167
     */
    @Override
    public void run() {
        while (true) {
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverLock();
            while (tomLayer.isRetrievingState()) {
                System.out.println("(DeliveryThread.run) Retrieving State.");
                canDeliver.awaitUninterruptibly();
                System.out.println("(DeliveryThread.run) canDeliver released.");
            }
            try {
                ArrayList<Decision> decisions = new ArrayList<Decision>();
                decidedLock.lock();
                if(decided.isEmpty()) {
                    notEmptyQueue.await();
                }
                decided.drainTo(decisions);
                decidedLock.unlock();
                if (decisions.size() > 0) {
                    TOMMessage[][] requests = new TOMMessage[decisions.size()][];
                    int[] consensusIds = new int[requests.length];
                    int[] leadersIds = new int[requests.length];
                    int[] regenciesIds = new int[requests.length];
                    CertifiedDecision[] proofs;
                    proofs = new CertifiedDecision[requests.length];
                    int count = 0;
                    for (Decision d : decisions) {
                        requests[count] = extractMessagesFromDecision(d);
                        consensusIds[count] = d.getConsensusId();
                        leadersIds[count] = d.getLeader();
                        regenciesIds[count] = d.getRegency();

                        CertifiedDecision led = new CertifiedDecision(this.controller.getStaticConf().getProcessId(),
                                d.getConsensusId(), d.getValue(), d.getDecisionEpoch().proof);
                        proofs[count] = led;

                        // cons.firstMessageProposed contains the performance counters
                        if (requests[count][0].equals(d.firstMessageProposed)) {
                            long time = requests[count][0].timestamp;
                            requests[count][0] = d.firstMessageProposed;
                            requests[count][0].timestamp = time;
                        }

                        count++;
                    }

                    Decision lastDecision = decisions.get(decisions.size() - 1);

                    if (requests != null && requests.length > 0) {
                        deliverMessages(consensusIds, regenciesIds, leadersIds, proofs, requests);

                        // ******* EDUARDO BEGIN ***********//
                        if (controller.hasUpdates()) {
                            processReconfigMessages(lastDecision.getConsensusId());

                            // set the consensus associated to the last decision as the last executed
                            tomLayer.setLastExec(lastDecision.getConsensusId());
                            // define that end of this execution
                            tomLayer.setInExec(-1);
                            // ******* EDUARDO END **************//
                        }
                    }

                    // define the last stable consensus... the stable consensus can
                    // be removed from the leaderManager and the executionManager
                    // TODO: Is this part necessary? If it is, can we put it
                    // inside setLastExec
                    int cid = lastDecision.getConsensusId();
                    if (cid > 2) {
                        int stableConsensus = cid - 3;

                        tomLayer.lm.removeStableConsenusInfos(stableConsensus);
                        tomLayer.execManager.removeConsensus(stableConsensus);
                    }
                }
            } catch (Exception e) {
                    e.printStackTrace(System.err);
            }

            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverUnlock();
            /******************************************************************/
        }
B
bessani@gmail.com 已提交
248
    }
249
    
250 251
    private TOMMessage[] extractMessagesFromDecision(Decision dec) {
    	TOMMessage[] requests = (TOMMessage[]) dec.getDeserializedValue();
252
    	if (requests == null) {
253 254 255
            // there are no cached deserialized requests
            // this may happen if this batch proposal was not verified
            // TODO: this condition is possible?
256

257
            Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");
258

259 260 261 262
            // obtain an array of requests from the decisions obtained
            BatchReader batchReader = new BatchReader(dec.getValue(),
                            controller.getStaticConf().getUseSignatures() == 1);
            requests = batchReader.deserialiseRequests(controller);
263
    	} else {
264
            Logger.println("(DeliveryThread.run) using cached requests from the propose.");
265 266 267
    	}

    	return requests;
B
bessani@gmail.com 已提交
268
    }
269
    
270
    protected void deliverUnordered(TOMMessage request, int regency) {
271 272 273 274 275
        MessageContext msgCtx = new MessageContext(request.getSender(), request.getViewID(), request.getReqType(),
                request.getSession(), request.getSequence(), request.getOperationId(), request.getReplyServer(), request.serializedMessageSignature,
                System.currentTimeMillis(), 0, 0, regency, -1, -1, null, null, false); // Since the request is unordered,
                                                                                       // there is no consensus info to pass
        
276
        msgCtx.readOnly = true;
277
        receiver.receiveReadonlyMessage(request, msgCtx);
B
bessani@gmail.com 已提交
278 279
    }

280
    private void deliverMessages(int consId[], int regencies[], int leaders[], CertifiedDecision[] proofs, TOMMessage[][] requests) {
281
        receiver.receiveMessages(consId, regencies, leaders, proofs, requests);
B
bessani@gmail.com 已提交
282 283
    }

284 285
    private void processReconfigMessages(int consId) {
        byte[] response = controller.executeUpdates(consId);
286
        TOMMessage[] dests = controller.clearUpdates();
B
bessani@gmail.com 已提交
287 288 289

        for (int i = 0; i < dests.length; i++) {
            tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
290
                    new TOMMessage(controller.getStaticConf().getProcessId(),
B
bessani@gmail.com 已提交
291
                    dests[i].getSession(), dests[i].getSequence(), response,
292
                    controller.getCurrentViewId(),TOMMessageType.RECONFIG));
B
bessani@gmail.com 已提交
293 294 295 296 297
        }

        tomLayer.getCommunication().updateServersConnections();
    }

P
pjsousa@gmail.com 已提交
298
}