DeliveryThread.java 12.3 KB
Newer Older
P
pjsousa@gmail.com 已提交
1
/**
2 3 4 5 6 7 8 9 10 11 12 13 14 15
Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
16
package bftsmart.tom.core;
P
pjsousa@gmail.com 已提交
17

18
import java.util.ArrayList;
P
pjsousa@gmail.com 已提交
19 20
import java.util.concurrent.LinkedBlockingQueue;

21
import java.util.concurrent.locks.Condition;
22
import java.util.concurrent.locks.Lock;
23
import java.util.concurrent.locks.ReentrantLock;
24

25
import bftsmart.consensus.Decision;
26
import bftsmart.reconfiguration.ServerViewController;
27
import bftsmart.statemanagement.ApplicationState;
28
import bftsmart.tom.MessageContext;
29
import bftsmart.tom.ServiceReplica;
30 31
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.core.messages.TOMMessageType;
32
import bftsmart.tom.leaderchange.CertifiedDecision;
33 34 35
import bftsmart.tom.server.Recoverable;
import bftsmart.tom.util.BatchReader;
import bftsmart.tom.util.Logger;
P
pjsousa@gmail.com 已提交
36 37 38 39 40

/**
 * This class implements a thread which will deliver totally ordered requests to the application
 * 
 */
B
bessani@gmail.com 已提交
41
public final class DeliveryThread extends Thread {
P
pjsousa@gmail.com 已提交
42

J
Joao Sousa 已提交
43 44 45 46 47 48 49
    private final LinkedBlockingQueue<Decision> decided; 
    private final TOMLayer tomLayer; // TOM layer
    private final ServiceReplica receiver; // Object that receives requests from clients
    private final Recoverable recoverer; // Object that uses state transfer
    private final ServerViewController controller;
    private final Lock decidedLock = new ReentrantLock();
    private final Condition notEmptyQueue = decidedLock.newCondition();
P
pjsousa@gmail.com 已提交
50 51 52 53 54 55

    /**
     * Creates a new instance of DeliveryThread
     * @param tomLayer TOM layer
     * @param receiver Object that receives requests from clients
     */
56
    public DeliveryThread(TOMLayer tomLayer, ServiceReplica receiver, Recoverable recoverer, ServerViewController controller) {
P
pjsousa@gmail.com 已提交
57
        super("Delivery Thread");
J
Joao Sousa 已提交
58
        this.decided = new LinkedBlockingQueue<>();
P
pjsousa@gmail.com 已提交
59 60 61

        this.tomLayer = tomLayer;
        this.receiver = receiver;
62
        this.recoverer = recoverer;
63
        //******* EDUARDO BEGIN **************//
64
        this.controller = controller;
65
        //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
66 67
    }

68 69 70 71 72
    
   public Recoverable getRecoverer() {
        return recoverer;
    }
   
P
pjsousa@gmail.com 已提交
73
    /**
74 75
     * Invoked by the TOM layer, to deliver a decision
     * @param dec Decision established from the consensus
P
pjsousa@gmail.com 已提交
76
     */
77 78
    public void delivery(Decision dec) {
        if (!containsGoodReconfig(dec)) {
79

80 81 82
            Logger.println("(DeliveryThread.delivery) Decision from consensus " + dec.getConsensusId() + " does not contain good reconfiguration");
            //set this decision as the last one from this replica
            tomLayer.setLastExec(dec.getConsensusId());
B
bessani@gmail.com 已提交
83 84
            //define that end of this execution
            tomLayer.setInExec(-1);
85
        } //else if (tomLayer.controller.getStaticConf().getProcessId() == 0) System.exit(0);
P
pjsousa@gmail.com 已提交
86
        try {
J
Joao Sousa 已提交
87
            decidedLock.lock();
88
            decided.put(dec);
89
            
J
Joao Sousa 已提交
90
            // clean the ordered messages from the pending buffer
91
            TOMMessage[] requests = extractMessagesFromDecision(dec);
92 93
			tomLayer.clientsManager.requestsOrdered(requests);
            
94 95
            notEmptyQueue.signalAll();
            decidedLock.unlock();
96
            Logger.println("(DeliveryThread.delivery) Consensus " + dec.getConsensusId() + " finished. Decided size=" + decided.size());
P
pjsousa@gmail.com 已提交
97 98 99 100
        } catch (Exception e) {
            e.printStackTrace(System.out);
        }
    }
B
bessani@gmail.com 已提交
101

102 103
    private boolean containsGoodReconfig(Decision dec) {
        TOMMessage[] decidedMessages = dec.getDeserializedValue();
B
bessani@gmail.com 已提交
104 105

        for (TOMMessage decidedMessage : decidedMessages) {
106
            if (decidedMessage.getReqType() == TOMMessageType.RECONFIG
107
                    && decidedMessage.getViewID() == controller.getCurrentViewId()) {
B
bessani@gmail.com 已提交
108 109 110 111 112 113
                return true;
            }
        }
        return false;
    }

R
reiser@cs.fau.de 已提交
114
    /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
115 116 117 118
    private ReentrantLock deliverLock = new ReentrantLock();
    private Condition canDeliver = deliverLock.newCondition();

    public void deliverLock() {
119
    	// release the delivery lock to avoid blocking on state transfer
120 121 122 123
        decidedLock.lock();
        
        notEmptyQueue.signalAll();
        decidedLock.unlock();
124
    	
125 126
        deliverLock.lock();
    }
127

128 129 130 131 132 133 134
    public void deliverUnlock() {
        deliverLock.unlock();
    }

    public void canDeliver() {
        canDeliver.signalAll();
    }
135

136
    public void update(ApplicationState state) {
137
       
138
        int lastCID =  recoverer.setState(state);
139

140
        //set this decision as the last one from this replica
141 142
        System.out.println("Setting last CID to " + lastCID);
        tomLayer.setLastExec(lastCID);
143 144 145

        //define the last stable consensus... the stable consensus can
        //be removed from the leaderManager and the executionManager
146 147
        if (lastCID > 2) {
            int stableConsensus = lastCID - 3;
148
            tomLayer.execManager.removeOutOfContexts(stableConsensus);
149 150 151
        }

        //define that end of this execution
152
        //stateManager.setWaiting(-1);
153
        tomLayer.setNoExec();
154

155
        System.out.print("Current decided size: " + decided.size());
156
        decided.clear();
157

158
        System.out.println("(DeliveryThread.update) All finished up to " + lastCID);
159 160
    }

P
pjsousa@gmail.com 已提交
161
    /**
162
     * This is the code for the thread. It delivers decisions to the TOM
B
bessani@gmail.com 已提交
163
     * request receiver object (which is the application)
P
pjsousa@gmail.com 已提交
164 165 166 167
     */
    @Override
    public void run() {
        while (true) {
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverLock();
            while (tomLayer.isRetrievingState()) {
                System.out.println("(DeliveryThread.run) Retrieving State.");
                canDeliver.awaitUninterruptibly();
                System.out.println("(DeliveryThread.run) canDeliver released.");
            }
            try {
                ArrayList<Decision> decisions = new ArrayList<Decision>();
                decidedLock.lock();
                if(decided.isEmpty()) {
                    notEmptyQueue.await();
                }
                decided.drainTo(decisions);
                decidedLock.unlock();
                if (decisions.size() > 0) {
                    TOMMessage[][] requests = new TOMMessage[decisions.size()][];
                    int[] consensusIds = new int[requests.length];
                    int[] leadersIds = new int[requests.length];
                    int[] regenciesIds = new int[requests.length];
188 189
                    CertifiedDecision[] cDecs;
                    cDecs = new CertifiedDecision[requests.length];
190 191 192 193 194 195 196
                    int count = 0;
                    for (Decision d : decisions) {
                        requests[count] = extractMessagesFromDecision(d);
                        consensusIds[count] = d.getConsensusId();
                        leadersIds[count] = d.getLeader();
                        regenciesIds[count] = d.getRegency();

197
                        CertifiedDecision cDec = new CertifiedDecision(this.controller.getStaticConf().getProcessId(),
198
                                d.getConsensusId(), d.getValue(), d.getDecisionEpoch().proof);
199
                        cDecs[count] = cDec;
200 201 202 203

                        // cons.firstMessageProposed contains the performance counters
                        if (requests[count][0].equals(d.firstMessageProposed)) {
                            long time = requests[count][0].timestamp;
204 205
                            long seed = requests[count][0].seed;
                            int numOfNonces = requests[count][0].numOfNonces;
206 207
                            requests[count][0] = d.firstMessageProposed;
                            requests[count][0].timestamp = time;
208 209
                            requests[count][0].seed = seed;
                            requests[count][0].numOfNonces = numOfNonces;
210 211 212 213 214 215 216 217
                        }

                        count++;
                    }

                    Decision lastDecision = decisions.get(decisions.size() - 1);

                    if (requests != null && requests.length > 0) {
218
                        deliverMessages(consensusIds, regenciesIds, leadersIds, cDecs, requests);
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250

                        // ******* EDUARDO BEGIN ***********//
                        if (controller.hasUpdates()) {
                            processReconfigMessages(lastDecision.getConsensusId());

                            // set the consensus associated to the last decision as the last executed
                            tomLayer.setLastExec(lastDecision.getConsensusId());
                            // define that end of this execution
                            tomLayer.setInExec(-1);
                            // ******* EDUARDO END **************//
                        }
                    }

                    // define the last stable consensus... the stable consensus can
                    // be removed from the leaderManager and the executionManager
                    // TODO: Is this part necessary? If it is, can we put it
                    // inside setLastExec
                    int cid = lastDecision.getConsensusId();
                    if (cid > 2) {
                        int stableConsensus = cid - 3;

                        tomLayer.execManager.removeConsensus(stableConsensus);
                    }
                }
            } catch (Exception e) {
                    e.printStackTrace(System.err);
            }

            /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
            deliverUnlock();
            /******************************************************************/
        }
B
bessani@gmail.com 已提交
251
    }
252
    
253 254
    private TOMMessage[] extractMessagesFromDecision(Decision dec) {
    	TOMMessage[] requests = (TOMMessage[]) dec.getDeserializedValue();
255
    	if (requests == null) {
256 257 258
            // there are no cached deserialized requests
            // this may happen if this batch proposal was not verified
            // TODO: this condition is possible?
259

260
            Logger.println("(DeliveryThread.run) interpreting and verifying batched requests.");
261

262 263 264 265
            // obtain an array of requests from the decisions obtained
            BatchReader batchReader = new BatchReader(dec.getValue(),
                            controller.getStaticConf().getUseSignatures() == 1);
            requests = batchReader.deserialiseRequests(controller);
266
    	} else {
267
            Logger.println("(DeliveryThread.run) using cached requests from the propose.");
268 269 270
    	}

    	return requests;
B
bessani@gmail.com 已提交
271
    }
272
    
273
    protected void deliverUnordered(TOMMessage request, int regency) {
274 275 276 277 278
        MessageContext msgCtx = new MessageContext(request.getSender(), request.getViewID(), request.getReqType(),
                request.getSession(), request.getSequence(), request.getOperationId(), request.getReplyServer(), request.serializedMessageSignature,
                System.currentTimeMillis(), 0, 0, regency, -1, -1, null, null, false); // Since the request is unordered,
                                                                                       // there is no consensus info to pass
        
279
        msgCtx.readOnly = true;
280
        receiver.receiveReadonlyMessage(request, msgCtx);
B
bessani@gmail.com 已提交
281 282
    }

283 284
    private void deliverMessages(int consId[], int regencies[], int leaders[], CertifiedDecision[] cDecs, TOMMessage[][] requests) {
        receiver.receiveMessages(consId, regencies, leaders, cDecs, requests);
B
bessani@gmail.com 已提交
285 286
    }

287 288
    private void processReconfigMessages(int consId) {
        byte[] response = controller.executeUpdates(consId);
289
        TOMMessage[] dests = controller.clearUpdates();
B
bessani@gmail.com 已提交
290 291 292

        for (int i = 0; i < dests.length; i++) {
            tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
293
                    new TOMMessage(controller.getStaticConf().getProcessId(),
B
bessani@gmail.com 已提交
294
                    dests[i].getSession(), dests[i].getSequence(), response,
295
                    controller.getCurrentViewId(),TOMMessageType.RECONFIG));
B
bessani@gmail.com 已提交
296 297 298 299 300
        }

        tomLayer.getCommunication().updateServersConnections();
    }

P
pjsousa@gmail.com 已提交
301
}