ServiceProxy.java 19.6 KB
Newer Older
P
pjsousa@gmail.com 已提交
1
/**
2 3 4 5 6 7 8 9 10 11 12 13 14
Copyright (c) 2007-2013 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
15
 */
16
package bftsmart.tom;
P
pjsousa@gmail.com 已提交
17

H
huanghaiquan 已提交
18 19 20 21 22 23 24 25 26 27
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.LoggerFactory;

28
import bftsmart.reconfiguration.ReconfigureReply;
29
import bftsmart.reconfiguration.util.TOMConfiguration;
30
import bftsmart.reconfiguration.views.View;
31
import bftsmart.reconfiguration.views.ViewStorage;
32 33 34 35
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.core.messages.TOMMessageType;
import bftsmart.tom.util.Extractor;
import bftsmart.tom.util.TOMUtil;
H
huanghaiquan 已提交
36
import utils.exception.ViewObsoleteException;
S
shaozhuguang 已提交
37

P
pjsousa@gmail.com 已提交
38
/**
39
 * This class implements a TOMSender and represents a proxy to be used on the
40 41
 * client side of the replicated system. It sends a request to the replicas,
 * receives the reply, and delivers it to the application.
P
pjsousa@gmail.com 已提交
42 43 44
 */
public class ServiceProxy extends TOMSender {

45 46 47 48
	// Locks for send requests and receive replies
	protected ReentrantLock canReceiveLock = new ReentrantLock();
	protected ReentrantLock canSendLock = new ReentrantLock();
	private Semaphore sm = new Semaphore(0);
49
	private volatile int reqId = -1; // request id
50 51 52 53 54 55
	private int operationId = -1; // request id
	private TOMMessageType requestType;
	private int replyQuorum = 0; // size of the reply quorum
	private TOMMessage replies[] = null; // Replies from replicas are stored here
	private int receivedReplies = 0; // Number of received replies
	private TOMMessage response = null; // Reply delivered to the application
56
	private int invokeTimeout = 60;
57 58 59 60 61 62
	private Comparator<byte[]> comparator;
	private Extractor extractor;
	private Random rand = new Random(System.currentTimeMillis());
	private int replyServer;
	private HashResponseController hashResponseController;
	private int invokeUnorderedHashedTimeout = 10;
Z
zhangshuang 已提交
63
	private boolean viewObsolete = false;
64
	private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ServiceProxy.class);
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
//	/**
//	 * Constructor
//	 *
//	 * @see bellow
//	 */
//	public ServiceProxy(int processId) {
//		this(processId, "config/system.config", "config/hosts.config", "config", null, null);
//	}
//
//	/**
//	 * Constructor
//	 *
//	 * @see bellow
//	 */
//	public ServiceProxy(int processId, String configHome) {
//		this(processId, "config/system.config", "config/hosts.config", "config", null, null);
//	}
83 84 85 86

	/**
	 * Constructor
	 *
87 88 89 90 91 92 93 94 95 96
	 * @param processId
	 *            Process id for this client (should be different from replicas)
	 * @param configHome
	 *            Configuration directory for BFT-SMART
	 * @param replyComparator
	 *            used for comparing replies from different servers to extract one
	 *            returned by f+1
	 * @param replyExtractor
	 *            used for extracting the response from the matching quorum of
	 *            replies
97
	 */
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
//	public ServiceProxy(int processId, String systemConfigFile, String hostsConfigFile, String keystoreHome, Comparator<byte[]> replyComparator,
//			Extractor replyExtractor) {
//		this(new TOMConfiguration(processId,systemConfigFile, hostsConfigFile, keystoreHome), replyComparator, replyExtractor);
//		
////		if (configHome == null) {
////			init(processId);
////		} else {
////			init(processId, configHome);
////		}
////
////		replies = new TOMMessage[getViewManager().getCurrentViewN()];
////
////		comparator = (replyComparator != null) ? replyComparator : new Comparator<byte[]>() {
////			@Override
////			public int compare(byte[] o1, byte[] o2) {
////				return Arrays.equals(o1, o2) ? 0 : -1;
////			}
////		};
////
////		extractor = (replyExtractor != null) ? replyExtractor : new Extractor() {
////
////			@Override
////			public TOMMessage extractResponse(TOMMessage[] replies, int sameContent, int lastReceived) {
////				return replies[lastReceived];
////			}
////		};
//	}
125
	
126
	public ServiceProxy(TOMConfiguration config, Comparator<byte[]> replyComparator, Extractor replyExtractor) {
127 128 129 130 131
		this(config, null, replyComparator, replyExtractor);
	}

	public ServiceProxy(TOMConfiguration config, ViewStorage viewStorage, Comparator<byte[]> replyComparator, Extractor replyExtractor) {
		init(config, viewStorage);
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151

		replies = new TOMMessage[getViewManager().getCurrentViewN()];

		comparator = (replyComparator != null) ? replyComparator : new Comparator<byte[]>() {
			@Override
			public int compare(byte[] o1, byte[] o2) {
				return Arrays.equals(o1, o2) ? 0 : -1;
			}
		};

		extractor = (replyExtractor != null) ? replyExtractor : new Extractor() {

			@Override
			public TOMMessage extractResponse(TOMMessage[] replies, int sameContent, int lastReceived) {
				return replies[lastReceived];
			}
		};
	}

	/**
152 153
	 * Get the amount of time (in seconds) that this proxy will wait for servers
	 * replies before returning null.
154 155 156 157 158 159 160 161 162 163 164 165
	 *
	 * @return the invokeTimeout
	 */
	public int getInvokeTimeout() {
		return invokeTimeout;
	}

	public int getInvokeUnorderedHashedTimeout() {
		return invokeUnorderedHashedTimeout;
	}

	/**
166 167
	 * Set the amount of time (in seconds) that this proxy will wait for servers
	 * replies before returning null.
168
	 *
169 170
	 * @param invokeTimeout
	 *            the invokeTimeout to set
171 172 173 174 175 176 177 178 179 180
	 */
	public void setInvokeTimeout(int invokeTimeout) {
		this.invokeTimeout = invokeTimeout;
	}

	public void setInvokeUnorderedHashedTimeout(int timeout) {
		this.invokeUnorderedHashedTimeout = timeout;
	}

	public byte[] invokeOrdered(byte[] request) {
181 182 183 184 185 186
		try {
			return invoke(request, TOMMessageType.ORDERED_REQUEST);
		} catch (ViewObsoleteException voe) {
		    close();
			throw voe;
		}
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
	}

	public byte[] invokeUnordered(byte[] request) {
		return invoke(request, TOMMessageType.UNORDERED_REQUEST);
	}

	public byte[] invokeUnorderedHashed(byte[] request) {
		return invoke(request, TOMMessageType.UNORDERED_HASHED_REQUEST);
	}

	/**
	 * This method sends a request to the replicas, and returns the related reply.
	 * If the servers take more than invokeTimeout seconds the method returns null.
	 * This method is thread-safe.
	 *
202 203 204 205 206
	 * @param request
	 *            Request to be sent
	 * @param reqType
	 *            TOM_NORMAL_REQUESTS for service requests, and other for reconfig
	 *            requests.
207 208 209 210 211
	 * @return The reply from the replicas related to request
	 */
	public byte[] invoke(byte[] request, TOMMessageType reqType) {
		canSendLock.lock();

212 213 214 215 216 217
		try {
			// Clean all statefull data to prepare for receiving next replies
			Arrays.fill(replies, null);
			receivedReplies = 0;
			response = null;
			replyQuorum = getReplyQuorum();
218

219 220 221 222
			// Send the request to the replicas, and get its ID
			reqId = generateRequestId(reqType);
			operationId = generateOperationId();
			requestType = reqType;
223

224 225
			replyServer = -1;
			hashResponseController = null;
226

227
			if (requestType == TOMMessageType.UNORDERED_HASHED_REQUEST) {
228

229 230
				replyServer = getRandomlyServerId();
				LOGGER.debug("[{}] replyServerId {} pos at {}", this.getClass().getName(), replyServer, getViewManager().getCurrentViewPos(replyServer));
231

232 233
				hashResponseController = new HashResponseController(getViewManager().getCurrentViewPos(replyServer),
						getViewManager().getCurrentViewProcesses().length);
234

235 236 237
				TOMMessage sm = new TOMMessage(getProcessId(), getSession(), reqId, operationId, request,
						getViewManager().getCurrentViewId(), requestType);
				sm.setReplyServer(replyServer);
238

239
				TOMulticast(sm);
240
			} else {
241 242 243
				TOMulticast(request, reqId, operationId, reqType);
			}

244
			LOGGER.info("Sending request {} with reqId {}, operationId {}, clientId={}, hash = {}", reqType, reqId, operationId, getProcessId(), this.hashCode());
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
			LOGGER.debug("Expected number of matching replies: {}", replyQuorum);

			// This instruction blocks the thread, until a response is obtained.
			// The thread will be unblocked when the method replyReceived is invoked
			// by the client side communication system
			try {
				if (reqType == TOMMessageType.UNORDERED_HASHED_REQUEST) {
					if (!this.sm.tryAcquire(invokeUnorderedHashedTimeout, TimeUnit.SECONDS)) {
						LOGGER.debug("######## UNORDERED HASHED REQUEST TIMOUT ########");
						return invoke(request, TOMMessageType.ORDERED_REQUEST);
					}
				} else {
					if (!this.sm.tryAcquire(invokeTimeout, TimeUnit.SECONDS)) {
						LOGGER.error("###################TIMEOUT#######################");
						LOGGER.error("Reply timeout for reqId is {}", reqId);
						LOGGER.error("Process id {} // req id {} // TIMEOUT // ", getProcessId(), reqId);
						LOGGER.error("Replies received: {}", receivedReplies);
						LOGGER.error("Replies quorum: {}", replyQuorum);
263
						checkReplyNum(reqType, receivedReplies, replyQuorum);
264 265
						return null;
					}
266
				}
267 268
			} catch (ViewObsoleteException voe) {
				throw voe;
269
			} catch (Exception ex) {
270
				ex.printStackTrace();
271 272
			}

273
			LOGGER.debug("Response extracted {}", response);
274

275
			byte[] ret = null;
276

277 278 279 280
			if (response == null) {
				// the response can be null if n-f replies are received but there isn't
				// a replyQuorum of matching replies
				LOGGER.error("Received n-f replies and no response could be extracted. request.length = {}, type = {} !", request.length, reqType);
281

282 283 284 285 286 287 288 289 290 291
//				if (reqType == TOMMessageType.UNORDERED_REQUEST || reqType == TOMMessageType.UNORDERED_HASHED_REQUEST) {
//					// invoke the operation again, whitout the read-only flag
//					LOGGER.debug("###################RETRY#######################");
//					return invokeUnordered(request);
//				} else {
//					throw new RuntimeException("Received n-f replies without f+1 of them matching.");
//				}

				throw new RuntimeException("Received n-f replies without f+1 of them matching.");

292
			} else {
293 294 295 296 297 298 299 300 301 302 303
				// normal operation
				// ******* EDUARDO BEGIN **************//
				if (reqType == TOMMessageType.ORDERED_REQUEST) {
					// Reply to a normal request!
					if (response.getViewID() == getViewManager().getCurrentViewId()) {
						ret = response.getContent(); // return the response
					} else {// if(response.getViewID() > getViewManager().getCurrentViewId())
						// updated view received
						reconfigureTo((View) TOMUtil.getObject(response.getContent()));

						LOGGER.warn("Service proxy view id little than service replica view id, will re invoke request!");
304
						return invoke(request, reqType);
305 306 307 308 309 310 311 312 313 314
					}
				} else if (reqType == TOMMessageType.UNORDERED_REQUEST
						|| reqType == TOMMessageType.UNORDERED_HASHED_REQUEST) {
					ret = response.getContent(); // return the response
					if (response.getViewID() > getViewManager().getCurrentViewId()) {
						Object r = TOMUtil.getObject(response.getContent());
						if (r instanceof View) {
							reconfigureTo((View) r);
							return invoke(request, reqType);
						}
315
					}
316
				} else {
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
					if (response.getViewID() > getViewManager().getCurrentViewId()) {
						// Reply to a reconfigure request!
						LOGGER.debug("Reconfiguration request' reply received!");
						Object r = TOMUtil.getObject(response.getContent());
						if (r instanceof View) { // did not executed the request because it is using an outdated view
							reconfigureTo((View) r);

							return invoke(request, reqType);
						} else if (r instanceof ReconfigureReply) { // reconfiguration executed!
							reconfigureTo(((ReconfigureReply) r).getView());
							ret = response.getContent();
						} else {
							LOGGER.error("Unknown response type");
						}
					} else {
						LOGGER.error("Unexpected execution flow");
					}
334 335
				}
			}
336 337 338
			return ret;
		} finally {
			canSendLock.unlock();
339 340 341
		}
	}

342
	private void checkReplyNum(TOMMessageType reqType, int receivedReplies, int replyQuorum) {
Z
zhangshuang 已提交
343
//		if (reqType == TOMMessageType.ORDERED_REQUEST) {
344 345 346 347 348 349 350 351 352 353 354
			LOGGER.info("checkReplyNum, receivedReplies = {}, replyQuorum = {}, viewObsolete = {}", receivedReplies, replyQuorum, viewObsolete);
			if (receivedReplies > 0 && receivedReplies < replyQuorum && viewObsolete) {
				LOGGER.info("################################################################################################################################");
				LOGGER.info("########Consensus Client Recv Reply Num Is Not Satisfy Quorum, Client View Is Obsolete, Please Try To Re-auth Peer Node!########");
				LOGGER.info("################################################################################################################################");
				throw new ViewObsoleteException("Consensus Client View Obsolete, Please Try To Re Conn!");
			} else if (receivedReplies == 0) {
				LOGGER.info("####################################################################################################################################################");
				LOGGER.info("########Consensus Client Recv Reply Num Is 0, Client View Is Serious Obsolete or Consensus Node Block, Please Restart All Nodes And Gateway!########");
				LOGGER.info("####################################################################################################################################################");
			}
Z
zhangshuang 已提交
355
//		}
Z
zhangshuang 已提交
356 357
	}

358
	// ******* EDUARDO BEGIN **************//
359
	protected void reconfigureTo(View v) {
Z
zhangshuang 已提交
360
		LOGGER.debug("Installing a most up-to-date view with id {}", v.getId());
361 362 363 364 365
		getViewManager().reconfigureTo(v);
		getViewManager().getViewStore().storeView(v);
		replies = new TOMMessage[getViewManager().getCurrentViewN()];
		getCommunicationSystem().updateConnections();
	}
366
	// ******* EDUARDO END **************//
367 368 369 370

	/**
	 * This is the method invoked by the client side communication system.
	 *
371 372
	 * @param reply
	 *            The reply delivered by the client side communication system
373 374 375
	 */
	@Override
	public void replyReceived(TOMMessage reply) {
Z
zhangshuang 已提交
376
		LOGGER.info("Synchronously received reply from {} with sequence number {} ", reply.getSender(), reply.getSequence());
377
		canReceiveLock.lock();
378 379
		try {
			if (reqId == -1) {// no message being expected
380
				LOGGER.info("throwing out request: sender {}, reqId {}, hash = {}", reply.getSender(), reply.getSequence(), this.hashCode());
381 382 383 384 385
				return;
			}

			int pos = getViewManager().getCurrentViewPos(reply.getSender());

386
			if (pos < 0) { // ignore messages that don't come from replicas
Z
zhangshuang 已提交
387 388 389

				LOGGER.info("received reply from sender {}", reply.getSender());

390 391 392 393 394 395
				return;
			}

			int sameContent = 1;
			if (reply.getSequence() == reqId && reply.getReqType() == requestType) {

Z
zhangshuang 已提交
396
				LOGGER.info("I am proc {}, Receiving reply from {} with reqId {}. Putting on pos {}", this.getProcessId(), reply.getSender(), reply.getSequence(), pos);
397

398 399 400
				if (requestType == TOMMessageType.UNORDERED_HASHED_REQUEST) {
					response = hashResponseController.getResponse(pos, reply);
					if (response != null) {
401 402 403 404 405
						reqId = -1;
						this.sm.release(); // resumes the thread that is executing the "invoke" method
						return;
					}

406
				} else {
Z
zhangshuang 已提交
407 408 409
					if (this.getViewManager().getCurrentView().getId() < reply.getViewID()) {
						viewObsolete = true;
					}
410 411 412 413 414 415
					if (replies[pos] == null) {
						receivedReplies++;
					}
					replies[pos] = reply;

					// Compare the reply just received, to the others
416

417 418 419 420 421
					for (int i = 0; i < replies.length; i++) {

						if ((i != pos || getViewManager().getCurrentViewN() == 1) && replies[i] != null
								&& (comparator.compare(replies[i].getContent(), reply.getContent()) == 0)) {
							sameContent++;
Z
zhangshuang 已提交
422 423 424

							LOGGER.info("sameContent = {}, replyQuorum = {}, request type = {}", sameContent, replyQuorum, replies[i].getReqType());

425
							if (sameContent >= replyQuorum) {
Z
zhangshuang 已提交
426

427 428
								response = extractor.extractResponse(replies, sameContent, pos);
								reqId = -1;
Z
zhangshuang 已提交
429
								viewObsolete = false;
430 431 432 433 434 435
								this.sm.release(); // resumes the thread that is executing the "invoke" method
								return;
							}
						}
					}
				}
436

437 438 439 440
				if (response == null) {
					if (requestType.equals(TOMMessageType.ORDERED_REQUEST)) {
						if (receivedReplies == getViewManager().getCurrentViewN()) {
							reqId = -1;
Z
zhangshuang 已提交
441
							viewObsolete = false;
442 443
							this.sm.release(); // resumes the thread that is executing the "invoke" method
						}
444
					} else if (requestType.equals(TOMMessageType.UNORDERED_HASHED_REQUEST)) {
445 446
						if (hashResponseController.getNumberReplies() == getViewManager().getCurrentViewN()) {
							reqId = -1;
Z
zhangshuang 已提交
447
							viewObsolete = false;
448 449
							this.sm.release(); // resumes the thread that is executing the "invoke" method
						}
450 451 452 453
					} else if (requestType.equals(TOMMessageType.UNORDERED_REQUEST)) {
						// UNORDERED 消息
						if (receivedReplies == getViewManager().getCurrentViewN()) {
							reqId = -1;
Z
zhangshuang 已提交
454
							viewObsolete = false;
455 456
							this.sm.release(); // resumes the thread that is executing the "invoke" method
						}
457 458 459 460 461 462
					} else if (requestType.equals(TOMMessageType.RECONFIG)) {
						if (receivedReplies == getViewManager().getCurrentViewN()) {
							reqId = -1;
							viewObsolete = false;
							this.sm.release(); // resumes the thread that is executing the "invoke" method
						}
463
					} else { // OTHER
464 465
						if (receivedReplies != sameContent) {
							reqId = -1;
Z
zhangshuang 已提交
466
							viewObsolete = false;
467 468 469 470
							this.sm.release(); // resumes the thread that is executing the "invoke" method
						}
					}
				}
S
shaozhuguang 已提交
471 472
			} else {
				LOGGER.info("Ignoring reply from {} with reqId {}. Currently wait reqId {}", reply.getSender(), reply.getSequence(), reqId);
473
			}
474
		} catch (Exception ex) {
475
			LOGGER.error("Problem at ServiceProxy.ReplyReceived()");
476
			ex.printStackTrace();
477
		} finally {
478 479 480 481
			canReceiveLock.unlock();
		}
	}

482
	protected int getReplyQuorum() {
483
		if (getViewManager().getStaticConf().isBFT()) {
484 485
//			return (int) Math.ceil((getViewManager().getCurrentViewN() + getViewManager().getCurrentViewF()) / 2) + 1;
			return getViewManager().getCurrentViewF() + 1;
486 487 488 489 490
		} else {
			return (int) Math.ceil((getViewManager().getCurrentViewN()) / 2) + 1;
		}
	}

491
	private int getRandomlyServerId() {
492 493 494 495 496 497
		int numServers = super.getViewManager().getCurrentViewProcesses().length;
		int pos = rand.nextInt(numServers);

		return super.getViewManager().getCurrentViewProcesses()[pos];
	}

498
	private class HashResponseController {
499
		private TOMMessage reply;
500
		private byte[][] hashReplies;
501 502 503 504 505 506 507 508 509 510
		private int replyServerPos;
		private int countHashReplies;

		public HashResponseController(int replyServerPos, int length) {
			this.replyServerPos = replyServerPos;
			this.hashReplies = new byte[length][];
			this.reply = null;
			this.countHashReplies = 0;
		}

511
		public TOMMessage getResponse(int pos, TOMMessage tomMessage) {
512

513
			if (hashReplies[pos] == null) {
514 515 516
				countHashReplies++;
			}

517
			if (replyServerPos == pos) {
518
				reply = tomMessage;
Z
zhangshuang 已提交
519 520 521 522 523
				try {
					hashReplies[pos] = TOMUtil.computeHash(tomMessage.getContent());
				} catch (NoSuchAlgorithmException e) {
					e.printStackTrace();
				}
524
			} else {
525 526
				hashReplies[pos] = tomMessage.getContent();
			}
Z
zhangshuang 已提交
527
			LOGGER.debug("[{}] hashReplies[{}] = {}", this.getClass().getName(), pos, Arrays.toString(hashReplies[pos]));
528

529
			if (hashReplies[replyServerPos] != null) {
530 531 532 533 534 535 536 537 538
				int sameContent = 1;
				for (int i = 0; i < replies.length; i++) {
					if ((i != replyServerPos || getViewManager().getCurrentViewN() == 1) && hashReplies[i] != null
							&& (Arrays.equals(hashReplies[i], hashReplies[replyServerPos]))) {
						sameContent++;
						if (sameContent >= replyQuorum) {
							return reply;
						}
					}
539
				}
540 541 542 543
			}
			return null;
		}

544
		public int getNumberReplies() {
545 546 547
			return countHashReplies;
		}
	}
P
pjsousa@gmail.com 已提交
548
}