Acceptor.java 30.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/**
Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bftsmart.consensus.roles;

18 19 20 21 22 23 24 25 26 27 28 29
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.security.PrivateKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
30

31
import bftsmart.communication.MacKey;
32
import bftsmart.communication.ServerCommunicationSystem;
33
import bftsmart.consensus.Consensus;
34
import bftsmart.consensus.Epoch;
35
import bftsmart.consensus.app.BatchAppResult;
36
import bftsmart.consensus.app.ComputeCode;
37
import bftsmart.consensus.messages.ConsensusMessage;
S
shaozhuguang 已提交
38
import bftsmart.consensus.messages.MessageFactory;
39
import bftsmart.reconfiguration.ReplicaTopology;
40
import bftsmart.reconfiguration.ServerViewController;
S
shaozhuguang 已提交
41
import bftsmart.tom.core.ExecutionManager;
42
import bftsmart.tom.core.ReplyManager;
43 44 45
import bftsmart.tom.core.TOMLayer;
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.core.messages.TOMMessageType;
46 47
import bftsmart.tom.server.Replier;
import bftsmart.tom.server.defaultservices.DefaultRecoverable;
48 49 50
import bftsmart.tom.util.TOMUtil;

/**
51 52 53
 * This class represents the acceptor role in the consensus protocol. This class
 * work together with the TOMLayer class in order to supply a atomic multicast
 * service.
54 55 56 57
 *
 * @author Alysson Bessani
 */
public final class Acceptor {
58 59 60 61 62 63 64 65 66 67
	private static final Logger LOGGER = LoggerFactory.getLogger(Acceptor.class);

	// 最大尝试次数
	private static final int MAX_RETRY_SIZE = 3;

	private int me; // This replica ID
	private ExecutionManager executionManager; // Execution manager of consensus's executions
	private MessageFactory factory; // Factory for PaW messages
	private ServerCommunicationSystem communication; // Replicas comunication system
	private TOMLayer tomLayer; // TOM layer
68
	private ReplicaTopology topology;
69
	// private Cipher cipher;
70
//	private Mac mac;
71 72 73

	private LinkedBlockingQueue<ConsensusMessage> consensusMessageQueue = new LinkedBlockingQueue<>();
	private volatile boolean doWork = false;
74
//	private volatile Thread thrdWork;
75 76 77 78 79 80

	/**
	 * Creates a new instance of Acceptor.
	 * 
	 * @param communication Replicas communication system
	 * @param factory       Message factory for PaW messages
81
	 * @param topology
82
	 */
83
	public Acceptor(ServerCommunicationSystem communication, MessageFactory factory, ServerViewController topology) {
84
		this.communication = communication;
85
		this.me = topology.getStaticConf().getProcessId();
86
		this.factory = factory;
87
		this.topology = topology;
88 89 90 91 92 93 94
//		try {
//			// this.cipher = Cipher.getInstance("DES/ECB/PKCS5Padding");
//			// this.cipher = Cipher.getInstance(ServerConnection.MAC_ALGORITHM);
//			this.mac = Mac.getInstance(MessageConnection.MAC_ALGORITHM);
//		} catch (NoSuchAlgorithmException /* | NoSuchPaddingException */ ex) {
//			ex.printStackTrace();
//		}
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
	}

	public DefaultRecoverable getDefaultExecutor() {
		return (DefaultRecoverable) tomLayer.getDeliveryThread().getReceiver().getExecutor();
	}

	public Replier getBatchReplier() {
		return tomLayer.getDeliveryThread().getReceiver().getReplier();
	}

	public ReplyManager getReplyManager() {
		return tomLayer.getDeliveryThread().getReceiver().getRepMan();
	}

	public MessageFactory getFactory() {
		return factory;
	}

	/**
	 * Sets the execution manager for this acceptor
	 * 
	 * @param manager Execution manager for this acceptor
	 */
	public void setExecutionManager(ExecutionManager manager) {
		this.executionManager = manager;
	}

	/**
	 * Sets the TOM layer for this acceptor
	 * 
	 * @param tom TOM layer for this acceptor
	 */
	public void setTOMLayer(TOMLayer tom) {
		this.tomLayer = tom;
	}

	/**
	 * Called by communication layer to delivery Paxos messages. This method only
	 * verifies if the message can be executed and calls process message (storing it
	 * on an out of context message buffer if this is not the case)
	 *
	 * @param msg Paxos messages delivered by the communication layer
	 */
	public final void deliver(ConsensusMessage msg) {
		if (executionManager.checkLimits(msg)) {
Z
zhangshuang 已提交
140
//            LOGGER.debug("processing paxos msg with id " + msg.getNumber());
141 142 143
			LOGGER.debug("processing paxos msg with id {}", msg.getNumber());
			processMessage(msg);
		} else {
Z
zhangshuang 已提交
144
//            LOGGER.debug("out of context msg with id " + msg.getNumber());
145
			LOGGER.debug("out of context msg with id {}", msg.getNumber());
Z
zhangshuang 已提交
146 147 148

			tomLayer.processOutOfContext();
			tomLayer.processOutOfContextWriteAndAccept();
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
		}
	}

	public boolean checkSucc(Consensus consensus, int msgEpoch) {

		Epoch latestEpochObj = consensus.getLastEpoch();

		if (latestEpochObj == null) {
			return true;
		}

		int latestEpoch = latestEpochObj.getTimestamp();

		// 说明发生过领导者切换,本节点参与了领导者切换流程,并更新了本共识的时间戳,此时又收到老时间戳内的共识消息,对于这种共识消息不再处理
		if (msgEpoch < latestEpoch) {
164
			LOGGER.info("I am proc {}, checkSucc false, msgEpoch = {}, latestEpoch = {}", topology.getStaticConf().getProcessId(), msgEpoch, latestEpoch);
165 166 167 168 169
			return false;
		}

		// 说明本节点因为网络原因没有参与到领导者切换的流程;网络恢复后,收到时间戳前进后的共识消息;本分支让没有参与到领导者切换过程的节点后续走状态传输的过程去进行本地的更新;
		if ((tomLayer.getInExec() == consensus.getId()) && (msgEpoch > latestEpoch)) {
170
			LOGGER.info("I am proc {}, checkSucc false, msgEpoch = {}, latestEpoch = {}", topology.getStaticConf().getProcessId(), msgEpoch, latestEpoch);
171 172 173 174 175
			// 如果本轮共识已经处理完成并且提交了,不再处理该消息;
			// 如果没有提交,但是已经进行了预计算,需要对预计算进行回滚;
			// 如果本轮共识没有走到预计算的过程,对于新时间戳内的共识消息也不做处理
			// 本过程就是让没有参与到领导者切换过程的节点
			if (consensus.getPrecomputed() && !consensus.getPrecomputeCommited()) {
176
				Epoch epoch = consensus.getEpoch(latestEpoch, topology);
177 178 179 180 181 182
				consensus.lock.lock();
				try {
					getDefaultExecutor().preComputeRollback(consensus.getId(), epoch.getBatchId());
				} finally {
					consensus.lock.unlock();
				}
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
			}
			return false;
		}

		return true;
	}

	/**
	 * Called when a Consensus message is received or when a out of context message
	 * must be processed. It processes the received message according to its type
	 *
	 * @param msg The message to be processed
	 */
	public final void processMessage(ConsensusMessage msg) {
		Consensus consensus = executionManager.getConsensus(msg.getNumber());
198 199

		// 该版本添加特殊处理,后续需要考虑优化掉该处理
200 201 202
//		if (msg.getType() != MessageFactory.PROPOSE && consensus.getLastEpoch() != null && consensus.getLastEpoch().getTimestamp() > msg.getEpoch()) {
//			msg = new ConsensusMessage(msg.getType(),msg.getNumber(),consensus.getLastEpoch().getTimestamp(), msg.getSender(), msg.getValue());
//		}
203

204 205
		// 检查消息的epoch
		if (!checkSucc(consensus, msg.getEpoch())) {
206
			LOGGER.info("I am proc {}, msg type = {}, processMessage checkSucc failed!", topology.getStaticConf().getProcessId(), msg.getType());
207 208 209
			return;
		}

210

211
		// 收到的共识消息对应的时间戳
212
		Epoch poch = consensus.getEpoch(msg.getEpoch(), topology);
213 214 215

		switch (msg.getType()) {
		case MessageFactory.PROPOSE: {
Z
zhangshuang 已提交
216
			while (doWork && (!tomLayer.isReady())) {
217
				LOGGER.warn("Wait for the node[{}] to be ready... ", topology.getCurrentProcessId());
Z
zhangshuang 已提交
218 219 220 221 222
				try {
					Thread.sleep(200);
				} catch (InterruptedException e) {
				}
			}
223 224 225 226 227 228 229

			consensus.lock.lock();
			try {
				proposeReceived(poch, msg);
			} finally {
				consensus.lock.unlock();
			}
230 231 232
		}
			break;
		case MessageFactory.WRITE: {
233 234 235 236 237 238
			consensus.lock.lock();
			try {
				writeReceived(poch, msg.getSender(), msg.getValue());
			} finally {
				consensus.lock.unlock();
			}
239 240 241
		}
			break;
		case MessageFactory.ACCEPT: {
242 243 244 245 246 247
			consensus.lock.lock();
			try {
				acceptReceived(poch, msg);
			} finally {
				consensus.lock.unlock();
			}
248
		}
249
		}// End of : switch (msg.getType());
250 251 252 253 254 255 256 257 258 259 260 261
	}

	/**
	 * Called when a PROPOSE message is received or when processing a formerly out
	 * of context propose which is know belongs to the current consensus.
	 *
	 * @param msg The PROPOSE message to by processed
	 */
	private void proposeReceived(Epoch epoch, ConsensusMessage msg) {
		int cid = epoch.getConsensus().getId();
		int ts = epoch.getConsensus().getEts();
		int ets = executionManager.getConsensus(msg.getNumber()).getEts();
Z
zhangshuang 已提交
262
//    	LOGGER.debug("(Acceptor.proposeReceived) PROPOSE for consensus " + cid);
Z
zhangshuang 已提交
263

264
		LOGGER.debug("(Acceptor.proposeReceived) I am proc {}, PROPOSE for consensus {} ",
265
				topology.getStaticConf().getProcessId(), cid);
266

267 268 269 270 271 272 273 274
		if (msg.getSender() == executionManager.getCurrentLeader() // Is the replica the leader?
				&& epoch.getTimestamp() == 0 && ts == ets && ets == 0) { // Is all this in epoch 0?
			executePropose(epoch, msg.getValue());
		} else {
			LOGGER.error("Propose received is not from the expected leader");
		}
	}

275 276 277 278 279 280 281 282 283 284
//	private boolean isReady() {
//		if (tomLayer == null || tomLayer.getStateManager().isRetrievingState()) {
//			return false;
//		}
//
//		if (tomLayer == null || (!tomLayer.heartBeatTimer.isActived())) {
//			return false;
//		}
//		return true;
//	}
285

286 287 288 289 290 291 292 293 294 295 296
	/**
	 * Executes actions related to a proposed value.
	 *
	 * @param epoch the current epoch of the consensus
	 * @param value Value that is proposed
	 */
	private void executePropose(Epoch epoch, byte[] value) {

		try {
			int cid = epoch.getConsensus().getId();
			LOGGER.info("(Acceptor.executePropose) I am proc {}, executing propose for cid : {}, epoch timestamp: {}",
297
					topology.getStaticConf().getProcessId(), cid, epoch.getTimestamp());
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330

			long consensusStartTime = System.nanoTime();

			if (epoch.propValue == null) { // only accept one propose per epoch
				epoch.propValue = value;
				epoch.propValueHash = tomLayer.computeHash(value);

				/*** LEADER CHANGE CODE ********/
				epoch.getConsensus().addWritten(value);
				LOGGER.debug(
						"(Acceptor.executePropose) I have written value {}, in consensus instance {}, with timestamp {}",
						Arrays.toString(epoch.propValueHash), cid, epoch.getConsensus().getEts());
				/*****************************************/

				// start this consensus if it is not already running
				if (cid == tomLayer.getLastExec() + 1) {
					tomLayer.setInExec(cid);
				}
				epoch.deserializedPropValue = tomLayer.checkProposedValue(value, true);
				if (epoch.deserializedPropValue != null && epoch.deserializedPropValue.length > 0) {
					epoch.setProposeTimestamp(epoch.deserializedPropValue[0].timestamp);
				}

				if (epoch.deserializedPropValue != null && !epoch.isWriteSetted(me)) {
					if (epoch.getConsensus().getDecision().firstMessageProposed == null) {
						epoch.getConsensus().getDecision().firstMessageProposed = epoch.deserializedPropValue[0];
					}
					if (epoch.getConsensus().getDecision().firstMessageProposed.consensusStartTime == 0) {
						epoch.getConsensus().getDecision().firstMessageProposed.consensusStartTime = consensusStartTime;

					}
					epoch.getConsensus().getDecision().firstMessageProposed.proposeReceivedTime = System.nanoTime();

331
					if (topology.getStaticConf().isBFT()) {
332 333 334 335
						LOGGER.debug("(Acceptor.executePropose) sending WRITE for {}", cid);

						epoch.setWrite(me, epoch.propValueHash);
						epoch.getConsensus().getDecision().firstMessageProposed.writeSentTime = System.nanoTime();
Z
zhangshuang 已提交
336

Z
zhangshuang 已提交
337
//                    System.out.println("I am proc " + controller.getStaticConf().getProcessId() + ", send write msg" + ", cid is " + cid);
338
						communication.send(this.topology.getCurrentViewOtherAcceptors(),
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
								factory.createWrite(cid, epoch.getTimestamp(), epoch.propValueHash));

						LOGGER.debug("(Acceptor.executePropose) WRITE sent for {}", cid);

						computeWrite(cid, epoch, epoch.propValueHash);

						LOGGER.debug("(Acceptor.executePropose) WRITE computed for {}", cid);

					} else {
						epoch.setAccept(me, epoch.propValueHash);
						epoch.getConsensus().getDecision().firstMessageProposed.writeSentTime = System.nanoTime();
						epoch.getConsensus().getDecision().firstMessageProposed.acceptSentTime = System.nanoTime();
						/**** LEADER CHANGE CODE! ******/
						LOGGER.debug(
								"(Acceptor.executePropose) [CFT Mode] Setting consensus {}, QuorumWrite tiemstamp to {} and value {}",
								cid, epoch.getConsensus().getEts(), Arrays.toString(epoch.propValueHash));
						epoch.getConsensus().setQuorumWrites(epoch.propValueHash);
						/*****************************************/

358
						communication.send(this.topology.getCurrentViewOtherAcceptors(),
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
								factory.createAccept(cid, epoch.getTimestamp(), epoch.propValueHash));

						computeAccept(cid, epoch, epoch.propValueHash);
					}
					executionManager.processOutOfContext(epoch.getConsensus());
				}
			}
		} catch (Throwable e) {
			e.printStackTrace();
		}

	}

	/**
	 * Called when a WRITE message is received
	 *
	 * @param epoch Epoch of the receives message
	 * @param a     Replica that sent the message
	 * @param value Value sent in the message
	 */
	private void writeReceived(Epoch epoch, int a, byte[] value) {
		int cid = epoch.getConsensus().getId();
		LOGGER.debug("(Acceptor.writeAcceptReceived) WRITE from {} for consensus {}", a, cid);
		epoch.setWrite(a, value);

		computeWrite(cid, epoch, value);
	}

	/**
	 * merge byte array
	 * 
	 * @param prop    serialized prop value
	 * @param appHash app hash vaule
	 * @return
	 */
	public byte[] MergeByte(byte[] prop, byte[] appHash) {
		byte[] result = new byte[prop.length + appHash.length];
		System.arraycopy(prop, 0, result, 0, prop.length);
		System.arraycopy(appHash, 0, result, prop.length, appHash.length);
		return result;
	}

	/**
	 * Computes WRITE values according to Byzantine consensus specification values
	 * received).
	 *
	 * @param cid   Consensus ID of the received message
	 * @param epoch Epoch of the receives message
	 * @param value Value sent in the message
	 */
	private void computeWrite(int cid, Epoch epoch, byte[] value) {
		try {
			int writeAccepted = epoch.countWrite(value);

413
			if (writeAccepted > topology.getQuorum()) {
414
				LOGGER.info("(Acceptor.computeWrite) I am proc {}, I have {} WRITEs for cid {}, epoch timestamp {}",
415
						this.topology.getStaticConf().getProcessId(), writeAccepted, cid, epoch.getTimestamp());
Z
zhangshuang 已提交
416

Z
zhangshuang 已提交
417
//            System.out.println("(computeWrite) I am proc " + controller.getStaticConf().getProcessId() + ", my propose value hash is " + epoch.propValueHash + ", recv propose hash is "+ value + ", cid is " + cid + ", epoch is " + epoch.getTimestamp());
Z
zhangshuang 已提交
418

419
				if (!epoch.isAcceptSetted(me) && Arrays.equals(value, epoch.propValueHash)) {
420

421
					LOGGER.debug("(Acceptor.computeWrite) I am proc {} sending WRITE for {}",
422
							this.topology.getStaticConf().getProcessId(), cid);
423

424 425 426 427 428 429
					/**** LEADER CHANGE CODE! ******/
					LOGGER.debug(
							"(Acceptor.computeWrite) Setting consensus {} , QuorumWrite tiemstamp to {} and value {}",
							cid, epoch.getConsensus().getEts(), Arrays.toString(value));
					epoch.getConsensus().setQuorumWrites(value);
					/*****************************************/
430

431
					if (epoch.getConsensus().getDecision().firstMessageProposed != null) {
432

433 434
						epoch.getConsensus().getDecision().firstMessageProposed.acceptSentTime = System.nanoTime();
					}
435

436
					// add to implement application consistency
437
					if (topology.getStaticConf().isBFT()) {
438

439
						DefaultRecoverable defaultExecutor = getDefaultExecutor();
Z
zhangshuang 已提交
440
//                        byte[][] commands = new byte[epoch.deserializedPropValue.length][];
441
						List<byte[]> commands = new ArrayList<byte[]>();
442

443 444 445 446 447 448 449 450 451 452
						for (int i = 0; i < epoch.deserializedPropValue.length; i++) {
							// 对于视图ID落后于当前节点视图ID的请求或者Reconfig请求不进行预计算处理
							if (ViewIdBackWard(epoch.deserializedPropValue[i])
									|| isReconfig(epoch.deserializedPropValue[i])) {
								continue;
							}
							// 视图ID正常的请求才会继续进行后面的预计算过程
							commands.add(epoch.deserializedPropValue[i].getContent());
							epoch.deserializedPrecomputeValue.add(epoch.deserializedPropValue[i]);
						}
453

Z
zhangshuang 已提交
454
						LOGGER.info("I am proc {}, start pre compute , cid = {}, epoch = {}", this.topology.getStaticConf().getProcessId(), cid, epoch.getTimestamp());
455 456
						BatchAppResult appHashResult = defaultExecutor.preComputeHash(cid,
								commands.toArray(new byte[commands.size()][]), epoch.getProposeTimestamp());
457

458 459
						byte[] result = MergeByte(epoch.propValue, appHashResult.getAppHashBytes());
						epoch.propAndAppValue = result;
460

461
						epoch.propAndAppValueHash = tomLayer.computeHash(result);
462

463
						epoch.preComputeRes = appHashResult.getComputeCode();
464

465
						epoch.commonHash = appHashResult.getGenisHashBytes();
466

467
						tomLayer.getExecManager().getConsensus(cid).setPrecomputed(true);
468

469
						epoch.setAsyncResponseLinkedList(appHashResult.getAsyncResponses());
Z
zhangshuang 已提交
470

471
						epoch.batchId = appHashResult.getBatchId();
472

473
						epoch.setAccept(me, epoch.propAndAppValueHash);
474

475 476
						ConsensusMessage cm = factory.createAccept(cid, epoch.getTimestamp(),
								epoch.propAndAppValueHash);
477

478 479
						// add origin propose hash for accept type consensus msg
						cm.setOrigPropValue(epoch.propValueHash);
Z
zhangshuang 已提交
480

481 482 483 484 485
						// Create a cryptographic proof for this ACCEPT message
						LOGGER.debug(
								"(Acceptor.computeWrite) Creating cryptographic proof for my ACCEPT message from consensus {}",
								cid);
						insertProof(cm, epoch);
486

487
						int[] targets = this.topology.getCurrentViewOtherAcceptors();
Z
zhangshuang 已提交
488
//                    System.out.println("I am proc " + controller.getStaticConf().getProcessId() + ", send accept msg" + ", cid is "+cid);
489
						communication.send(targets, cm);
490
//                    communication.getServersConn().send(targets, cm, true);
491

492 493 494 495 496 497 498 499 500 501 502 503 504
						epoch.addToProof(cm);
						computeAccept(cid, epoch, epoch.propAndAppValueHash);
					} else {
						epoch.setAccept(me, value);

						ConsensusMessage cm = factory.createAccept(cid, epoch.getTimestamp(), value);

						// Create a cryptographic proof for this ACCEPT message
						LOGGER.debug(
								"(Acceptor.computeWrite) Creating cryptographic proof for my ACCEPT message from consensus {}",
								cid);
						insertProof(cm, epoch);

505
						int[] targets = this.topology.getCurrentViewOtherAcceptors();
H
huanghaiquan 已提交
506
						communication.getServersCommunication().send(targets, cm, true);
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544

						// communication.send(this.reconfManager.getCurrentViewOtherAcceptors(),
						// factory.createStrong(cid, epoch.getNumber(), value));
						epoch.addToProof(cm);
						computeAccept(cid, epoch, value);

					}
				}
			}
		} catch (Throwable e) {
			e.printStackTrace();
		}
	}

	/**
	 * Create a cryptographic proof for a consensus message
	 * 
	 * This method modifies the consensus message passed as an argument, so that it
	 * contains a cryptographic proof.
	 * 
	 * @param cm    The consensus message to which the proof shall be set
	 * @param epoch The epoch during in which the consensus message was created
	 */
	private void insertProof(ConsensusMessage cm, Epoch epoch) {
		ByteArrayOutputStream bOut = new ByteArrayOutputStream(248);
		try {
			new ObjectOutputStream(bOut).writeObject(cm);
		} catch (IOException ex) {
			ex.printStackTrace();
		}

		byte[] data = bOut.toByteArray();

		// check if consensus contains reconfiguration request
		TOMMessage[] msgs = epoch.deserializedPropValue;
		boolean hasReconf = false;

		for (TOMMessage msg : msgs) {
545
			if (msg.getReqType() == TOMMessageType.RECONFIG && msg.getViewID() == topology.getCurrentViewId()) {
546 547 548 549 550 551 552 553 554 555
				hasReconf = true;
				break; // no need to continue, exit the loop
			}
		}

		// If this consensus contains a reconfiguration request, we need to use
		// signatures (there might be replicas that will not be part of the next
		// consensus instance, and so their MAC will be outdated and useless)
		if (hasReconf) {

556
			PrivateKey RSAprivKey = topology.getStaticConf().getRSAPrivateKey();
557 558 559 560 561 562

			byte[] signature = TOMUtil.signMessage(RSAprivKey, data);

			cm.setProof(signature);

		} else { // ... if not, we can use MAC vectores
563
			int[] processes = this.topology.getCurrentViewProcesses();
564 565 566 567

			HashMap<Integer, byte[]> macVector = new HashMap<>();

			for (int id : processes) {
568 569 570
				if (id == me) {
					continue;
				}
571 572
				int retrySize = 0;
				try {
573 574
//					SecretKey key = null;
					MacKey macKey = null;
575
					while (retrySize < MAX_RETRY_SIZE) {
576 577 578 579 580

//						key = communication.getServersCommunication().getSecretKey(id);
						macKey = tomLayer.getCommunication().getServersCommunication()
								.getMacKey(id);
						if (macKey == null) {
581 582 583
							LOGGER.error("(Acceptor.insertProof) I don't have yet a secret key with {} . Retrying.",
									id);
							retrySize++;
584
							Thread.sleep(10);
585 586 587 588
						} else {
							break;
						}
					}
589 590 591 592
					if (macKey != null) {
//						this.mac.init(key);
						byte[] macBytes = macKey.generateMac(data);
						macVector.put(id, macBytes);
593
					}
S
shaozhuguang 已提交
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
//
//                    do {
//                        key = communication.getServersConn().getSecretKey(id);
//                        if (key == null) {
//                            LOGGER.error("(Acceptor.insertProof) I don't have yet a secret key with {} . Retrying.", id);
//                            Thread.sleep(1000);
//                        }
//
//                    } while (key == null);  // JCS: This loop is to solve a race condition where a
//                                            // replica might have already been insert in the view or
//                                            // recovered after a crash, but it still did not concluded
//                                            // the diffie helman protocol. Not an elegant solution,
//                                            // but for now it will do
//                    this.mac.init(key);
//                    macVector.put(id, this.mac.doFinal(data));
609 610 611 612 613 614 615
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			}

			cm.setProof(macVector);
		}
616

617
	}
618

619 620 621 622 623 624 625 626 627 628 629 630
	/**
	 * Called when a ACCEPT message is received
	 * 
	 * @param epoch Epoch of the receives message
	 * @param a     Replica that sent the message
	 * @param value Value sent in the message
	 */
	private void acceptReceived(Epoch epoch, ConsensusMessage msg) {
		int cid = epoch.getConsensus().getId();
		LOGGER.debug("(Acceptor.acceptReceived) ACCEPT from {} for consensus {}", msg.getSender(), cid);
		epoch.setAccept(msg.getSender(), msg.getValue());
		epoch.addToProof(msg);
631

632 633
		computeAccept(cid, epoch, msg.getValue());
	}
634

635
	private void updateConsensusSetting(Epoch epoch) {
Z
zhangshuang 已提交
636

637
		TOMMessage[] requests = epoch.deserializedPropValue;
Z
zhangshuang 已提交
638

639 640
		if (requests == null) {
			tomLayer.setLastExec(tomLayer.getInExec());
641

642 643 644
			tomLayer.setInExec(-1);
			return;
		}
Z
zhangshuang 已提交
645

646
		tomLayer.clientsManager.requestsPending(requests);
Z
zhangshuang 已提交
647

648
		tomLayer.setLastExec(tomLayer.getInExec());
Z
zhangshuang 已提交
649

650
		tomLayer.setInExec(-1);
Z
zhangshuang 已提交
651

652
	}
Z
zhangshuang 已提交
653

654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680
//    private void createResponses(Epoch epoch,  List<byte[]> updatedResp) {
//
//        TOMMessage[] requests = epoch.deserializedPrecomputeValue.toArray(new TOMMessage[epoch.deserializedPrecomputeValue.size()]);
//
//        Replier replier = getBatchReplier();
//
//        ReplyManager repMan = getReplyManager();
//
//        for (int index = 0; index < requests.length; index++) {
//            TOMMessage request = requests[index];
//            request.reply = new TOMMessage(me, request.getSession(), request.getSequence(),
//                    request.getOperationId(), updatedResp.get(index), controller.getCurrentViewId(),
//                    request.getReqType());
//
//            if (controller.getStaticConf().getNumRepliers() > 0) {
//                LOGGER.debug("(ServiceReplica.receiveMessages) sending reply to {} with sequence number {} and and operation ID {} via ReplyManager", request.getSender(), request.getSequence()
//                        , request.getOperationId());
//                repMan.send(request);
//            } else {
//                LOGGER.debug("(ServiceReplica.receiveMessages) sending reply to {} with sequence number {} and operation ID {}"
//                        , request.getSender(), request.getSequence(), request.getOperationId());
//                replier.manageReply(request, null);
//                // cs.send(new int[]{request.getSender()}, request.reply);
//            }
//        }
//
//    }
681 682 683 684 685 686 687 688 689 690
	/**
	 * Computes ACCEPT values according to the Byzantine consensus specification
	 * 
	 * @param epoch Epoch of the receives message
	 * @param value Value sent in the message
	 */
	private void computeAccept(int cid, Epoch epoch, byte[] value) {
		try {
			List<byte[]> updatedResp;

691
			if (epoch.countAccept(value) > topology.getQuorum() && !epoch.getConsensus().isDecided()) {
692
				LOGGER.info("(Acceptor.computeAccept) I am proc {}, I have {} ACCEPTs for cid {} and timestamp {}",
693
						topology.getStaticConf().getProcessId(), epoch.countAccept(value), cid, epoch.getTimestamp());
694 695 696
				if (Arrays.equals(value, epoch.propAndAppValueHash)
						&& (ComputeCode.valueOf(epoch.getPreComputeRes()) == ComputeCode.SUCCESS)) {
					LOGGER.debug("(Acceptor.computeAccept) I am proc {}. Deciding {} ",
697
							topology.getStaticConf().getProcessId(), cid);
698 699
					try {
						LOGGER.info("(Acceptor.computeAccept) I am proc {}, I will write cid {} 's propse to ledger",
700
								topology.getStaticConf().getProcessId(), cid);
701 702 703 704 705 706 707
						// 发生过预计算才会进行commit的操作,对于视图ID号小的请求以及视图更新的重配请求没有进行过预计算,不需要提交
						getDefaultExecutor().preComputeCommit(cid, epoch.getBatchId());
						tomLayer.getExecManager().getConsensus(cid).setPrecomputeCommited(true);
						decide(epoch);
					} catch (Exception e) {
						// maybe storage exception
						LOGGER.error("I am proc {} , flush storage fail, will rollback!",
708
								topology.getStaticConf().getProcessId());
709 710 711 712 713 714 715 716 717 718
						getDefaultExecutor().preComputeRollback(cid, epoch.getBatchId());
						updateConsensusSetting(epoch);
						updatedResp = getDefaultExecutor().updateResponses(epoch.getAsyncResponseLinkedList(),
								epoch.commonHash, false);
						epoch.setAsyncResponseLinkedList(updatedResp);
						decide(epoch);
					}
				} else if (Arrays.equals(value, epoch.propAndAppValueHash)
						&& (ComputeCode.valueOf(epoch.getPreComputeRes()) == ComputeCode.FAILURE)) {
					LOGGER.error("I am proc {} , cid {}, precompute fail, will rollback",
719
							topology.getStaticConf().getProcessId(), cid);
720 721 722 723 724 725 726
					getDefaultExecutor().preComputeRollback(cid, epoch.getBatchId());
					updateConsensusSetting(epoch);
					decide(epoch);
				} else if (!Arrays.equals(value, epoch.propAndAppValueHash)) {
					// Leader does evil to me only, need to roll back
					LOGGER.error(
							"(computeAccept) I am proc {}, My last regency is {}, Quorum is satisfied, but leader maybe do evil, will goto pre compute rollback branch!",
727
							topology.getStaticConf().getProcessId(),
728 729 730
							tomLayer.getSynchronizer().getLCManager().getLastReg());
					LOGGER.error(
							"(computeAccept) I am proc {}, my cid is {}, my propose value hash is {}, recv propose value hash is {}, my epoc timestamp is {}",
731
							topology.getStaticConf().getProcessId(), cid, epoch.propAndAppValueHash, value,
732 733 734 735 736 737 738 739 740 741 742
							epoch.getTimestamp());
					// rollback
					getDefaultExecutor().preComputeRollback(cid, epoch.getBatchId());
					// This round of consensus has been rolled back, mark it
					tomLayer.execManager.updateConsensus(tomLayer.getInExec());

					updateConsensusSetting(epoch);

					decide(epoch);

					// Pause processing of new messages, Waiting for trigger state transfer
S
shaozhuguang 已提交
743 744
//                tomLayer.requestsTimer.Enabled(false);
//                tomLayer.requestsTimer.stopTimer();
745

746 747 748 749 750 751 752 753
					if (!tomLayer.execManager.stopped()) {
						tomLayer.execManager.stop();
					}
				}
				return;
			}

			// consensus node hash inconsistent
754 755
			if (((epoch.countAcceptSetted() == topology.getCurrentViewN())
					&& (epoch.countAccept(value) < topology.getQuorum() + 1))
756
					|| ((epoch.countAcceptSetted() > 2f) && (epoch.countAccept(value) < topology.getCurrentViewF() + 1)
757
							&& (epoch.maxSameValueCount() < topology.getCurrentViewF() + 1))) {
758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776

				LOGGER.error(
						"Quorum is not satisfied, node's pre compute hash is inconsistent, will goto pre compute rollback phase!");
				getDefaultExecutor().preComputeRollback(cid, epoch.getBatchId());
				updateConsensusSetting(epoch);

				updatedResp = getDefaultExecutor().updateResponses(epoch.getAsyncResponseLinkedList(), epoch.commonHash,
						true);
				epoch.setAsyncResponseLinkedList(updatedResp);
				decide(epoch);
			}
		} catch (Throwable e) {
			e.printStackTrace();
		}

	}

	// 视图ID落后的非Reconfig请求
	private boolean ViewIdBackWard(TOMMessage tomMessage) {
777
		return tomMessage.getViewID() < this.topology.getCurrentViewId()
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
				&& tomMessage.getReqType() != TOMMessageType.RECONFIG;
	}

	// Reconfig请求
	private boolean isReconfig(TOMMessage tomMessage) {
		return tomMessage.getReqType() == TOMMessageType.RECONFIG;
	}

	/**
	 * This is the method invoked when a value is decided by this process
	 * 
	 * @param epoch Epoch at which the decision is made
	 */
	private void decide(Epoch epoch) {
		if (epoch.getConsensus().getDecision().firstMessageProposed != null)
			epoch.getConsensus().getDecision().firstMessageProposed.decisionTime = System.nanoTime();

		epoch.getConsensus().decided(epoch, true);
	}

	public synchronized void shutdown() {
		doWork = false;
800 801 802 803 804 805 806 807 808 809 810

//		if (!doWork) {
//			return;
//		}
//		doWork = false;
//		thrdWork.interrupt();
//		try {
//			thrdWork.join();
//		} catch (InterruptedException e) {
//		}
//		thrdWork = null;
811 812 813 814 815
	}

	public synchronized void start() {
		doWork = true;

816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835
//		if (doWork) {
//			return;
//		}
//		thrdWork = new Thread(new Runnable() {
//			@Override
//			public void run() {
//				LOGGER.info("Start processing consensus message ... [CurrentProcessId={}]",
//						controller.getCurrentProcessId());
//
//				startProcessPropose();
//
//				LOGGER.info("Exit processing consensus message!! --[CurrentProcessId={}]",
//						controller.getCurrentProcessId());
//			}
//		}, "Acceptor");
//		thrdWork.setDaemon(true);
//
//		doWork = true;
//
//		thrdWork.start();
836
	}
Z
zhangshuang 已提交
837

838
}