提交 c344cef0 编写于 作者: J João Sousa

Added counte-measurements agaisnt malicious leader trying to overload the other replicas.

上级 589602a3
无法预览此类型文件
......@@ -125,34 +125,34 @@ system.controlflow.controlflow = true
#Maximum number of total pending requests that each replica can hold. Set to -1 to disable.
system.controlflow.maxpendingreqs = 500000
#Preferred number of total pending requests. Once each replica hold up to this many requests,
#it accepts requests again from clients. It must be lower than system.totalordermulticast.maxpendingreqs
#Preferred number of total pending requests. It must be lower than system.totalordermulticast.maxpendingreqs
system.controlflow.preferredpendingreqs = 250000
#Maximum number of total pending decisions that each replica can hold. Set to -1 to disable.
system.controlflow.maxpendingdecs = 200
#Preferred number of total pending decisions. Once each replica hold up to this many decisions,
#it accepts requests again from clients. It must be lower than system.totalordermulticast.maxpendingdecs
#Preferred number of total pending decisions. It must be lower than system.totalordermulticast.maxpendingdecs
system.controlflow.preferredpendingdecs = 100
#Maximum number of total pending replies that each replica can hold. Set to -1 to disable.
system.controlflow.maxpendingreps = 2000
#Preferred number of total pending replies. Once each replica hold up to this many replies,
#it accepts requests again from clients. It must be lower than system.totalordermulticast.maxpendingreps
#Preferred number of total pending replies. It must be lower than system.totalordermulticast.maxpendingreps
system.controlflow.preferredpendingreps = 1000
#Maximum used memory that each replica should reach, in bytes. Set to -1 to disable.
system.controlflow.maxusedmemory = 524288000
#Preferred used memory, in bytes. Once each replica decreses its memory to this amount,
#it accepts requests again from clients. It must be lower than system.totalordermulticast.maxusedmemory
#Preferred used memory, in bytes. It must be lower than system.totalordermulticast.maxusedmemory
system.controlflow.preferredusedmemory = 393216000
#Timeout for the client-side retrying mechanism, in miliseconds
system.controlflow.timeout = 1000
#Maximum number of new requests a leader is allowed to send to a replica while that replica is not accepting new
#requests from clients. If this limit i exceeded, the replica will suspect the leader and try to trigger a leader change
system.controlflow.maxreqsfromleader = 10000
############################################
###### State Transfer Configurations #######
############################################
......
......@@ -29,7 +29,6 @@ import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.core.messages.TOMMessageType;
import bftsmart.tom.leaderchange.RequestsTimer;
import bftsmart.tom.server.RequestVerifier;
import bftsmart.tom.util.TOMUtil;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
......@@ -254,13 +253,38 @@ public class ClientsManager {
}
/**
* Verifies if some reqId is pending.
*
* @param req if some request is pending
* @return true if the request is pending
*/
public boolean isPending(TOMMessage req) {
return isPending(req.getSender(), req.getOperationId());
}
/**
* Verifies if some request is pending.
*
* @param reqId the request identifier
* @param sender the request sender
* @param operationID the request identifier
* @return true if the request is pending
*/
public boolean isPending(int reqId) {
return getPending(reqId) != null;
public boolean isPending(int sender, int operationID) {
ClientData clientData = getClientData(sender);
try {
clientData.clientLock.lock();
return clientData.getPendingRequests().getByOperationID(operationID) != null;
}
finally {
clientData.clientLock.unlock();
}
}
/**
......@@ -288,15 +312,13 @@ public class ClientsManager {
*
* @param request the received request
* @param fromClient the message was received from client or not?
* @param storeMessage the message should be stored or not? (read-only requests are not stored)
* @param cs server com. system to be able to send replies to already processed requests
*
* @return true if the request is ok and is added to the pending messages
* for this client, false if there is some problem and the message was not
* accounted
*/
public boolean requestReceived(TOMMessage request, boolean fromClient) {
long receptionTime = System.nanoTime();
long receptionTimestamp = System.currentTimeMillis();
......
......@@ -110,6 +110,17 @@ public class RequestList extends LinkedList<TOMMessage> {
}
return null;
}
public TOMMessage getByOperationID(int operationID){
for(ListIterator<TOMMessage> li = listIterator(); li.hasNext(); ) {
TOMMessage msg = li.next();
if(msg.getOperationId() == operationID) {
return msg;
}
}
return null;
}
public boolean contains(int id){
for(ListIterator<TOMMessage> li = listIterator(); li.hasNext(); ) {
TOMMessage msg = li.next();
......
......@@ -32,6 +32,7 @@ public class TOMConfiguration extends Configuration {
protected int requestTimeout;
protected int batchTimeout;
protected int controlFlowTimeout;
protected int maxReqsFromLeader;
protected int invokeTimeout;
protected int nettyClientTimeout;
protected int nettyReplicaTimeout;
......@@ -278,6 +279,16 @@ public class TOMConfiguration extends Configuration {
}
}
s = (String) configs.remove("system.controlflow.maxreqsfromleader");
if (s == null) {
maxReqsFromLeader = 10000;
} else {
maxReqsFromLeader = Integer.parseInt(s);
if (maxReqsFromLeader <= 0) {
maxReqsFromLeader = 10000;
}
}
s = (String) configs.remove("system.totalordermulticast.replayVerificationTime");
if (s == null) {
replyVerificationTime = 0;
......@@ -568,6 +579,11 @@ public class TOMConfiguration extends Configuration {
return shutdownHookEnabled;
}
public int getMaxNewReqsFromLeader() {
return maxReqsFromLeader;
}
public boolean isStateTransferEnabled() {
return stateTransferEnabled;
}
......
......@@ -335,6 +335,7 @@ public class AsynchServiceProxy extends ServiceProxy {
logger.debug("Client {} also received ACK from leader, client "+
"can stop re-transmiting request #{}", getProcessId(), requestContext.getOperationId());
this.leader = leader;
Arrays.fill(acks, null);
requestsAcked.add(ackId);
ackId = -1;
......@@ -469,6 +470,8 @@ public class AsynchServiceProxy extends ServiceProxy {
sm.setAckSeq(ackSeq);
//int[] targets = (leader != -1 ? new int[]{leader} : getViewManager().getCurrentViewProcesses());
TOMulticast(sm);
//Control flow
......
......@@ -54,6 +54,7 @@ public class ServiceProxy extends TOMSender {
private int operationId = -1; // request id
protected int ackId = -1; // for the control flow mechanism
protected int ackSeq = -1; // for the control flow mechanism
protected int leader = -1;
private TOMMessageType requestType;
private int replyQuorum = 0; // size of the reply quorum
private TOMMessage replies[] = null; // Replies from replicas are stored here
......@@ -249,6 +250,8 @@ public class ServiceProxy extends TOMSender {
//logger.info("Sending invoke at client {} for request #{}", getViewManager().getStaticConf().getProcessId(), reqId);
//int[] targets = (leader != -1 ? new int[]{leader} : getViewManager().getCurrentViewProcesses());
TOMulticast(sm);
logger.debug("Sending request (" + reqType + ") with reqId=" + reqId);
......@@ -453,6 +456,7 @@ public class ServiceProxy extends TOMSender {
logger.debug("Client {} also received ACK from leader, client "+
"can stop re-transmiting request #{}", getProcessId(), operationId);
this.leader = leader;
Arrays.fill(acks, null);
ackId = -1;
this.controlFlow.release();
......
......@@ -284,6 +284,8 @@ public class ServiceReplica {
for (TOMMessage[] requestsFromConsensus : requests) {
if (requestsFromConsensus.length == 0) continue;
TOMMessage firstRequest = requestsFromConsensus[0];
int requestCount = 0;
noop = true;
......
......@@ -238,7 +238,7 @@ public final class DeliveryThread extends Thread {
cDecs[count] = cDec;
// cons.firstMessageProposed contains the performance counters
if (requests[count][0].equals(d.firstMessageProposed)) {
if (requests[count].length > 0 && requests[count][0].equals(d.firstMessageProposed)) {
long time = requests[count][0].timestamp;
long seed = requests[count][0].seed;
int numOfNonces = requests[count][0].numOfNonces;
......
......@@ -44,6 +44,7 @@ import bftsmart.tom.server.RequestVerifier;
import bftsmart.tom.util.BatchBuilder;
import bftsmart.tom.util.BatchReader;
import bftsmart.tom.util.TOMUtil;
import java.util.LinkedList;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
......@@ -81,11 +82,12 @@ public final class TOMLayer extends Thread implements RequestReceiver {
//timeout for batch
private Timer batchTimer = null;
private long lastRequest = -1;
private long lastRequest = -1;
//used for control flow
private boolean ignore = false;
private int reqsFromLeader = 0;
/**
* Store requests received but still not ordered
*/
......@@ -341,6 +343,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
usedMemory > this.controller.getStaticConf().getPreferredUsedMemory()) Runtime.getRuntime().gc(); // force garbage collection
ignore = true;
reqsFromLeader = 0;
} else if ((this.controller.getStaticConf().getMaxPendigReqs() < 0 || pendingReqs <= this.controller.getStaticConf().getPreferredPendigReqs()) &&
(this.controller.getStaticConf().getMaxPendigDecs() < 0 || pendingDecs <= this.controller.getStaticConf().getPreferredPendigDecs()) &&
......@@ -349,6 +352,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
{
ignore = false;
reqsFromLeader = 0;
}
if (ignore) {
......@@ -531,6 +535,14 @@ public final class TOMLayer extends Thread implements RequestReceiver {
dec.setRegency(syncher.getLCManager().getLastReg());
dec.setLeader(execManager.getCurrentLeader());
// deal with the corner case of a malicious leader trying to overload the system
if (reqsFromLeader > this.controller.getStaticConf().getMaxNewReqsFromLeader() && !isChangingLeader()) { //force a leader change if the proposal is garbage
reqsFromLeader = 0;
getSynchronizer().triggerTimeout(new LinkedList<>());
}
this.dt.delivery(dec); // Sends the decision to the delivery thread
}
......@@ -560,7 +572,54 @@ public final class TOMLayer extends Thread implements RequestReceiver {
requests = batchReader.deserialiseRequests(this.controller);
if (addToClientManager) {
//Control flow mechanism
int pendingReqs = clientsManager.countPendingRequests();
int pendingDecs = dt.getPendingDecisions();
int pendingReps = dt.getReplyManager().getPendingReplies();
long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
if (ignore) {
// deal with the corner case of a malicious leader trying to overload the system
if ((this.controller.getStaticConf().getMaxPendigReqs() > 0 && pendingReqs > this.controller.getStaticConf().getPreferredPendigReqs()) ||
(this.controller.getStaticConf().getMaxPendigDecs() > 0 && pendingDecs > this.controller.getStaticConf().getPreferredPendigDecs()) ||
(this.controller.getStaticConf().getMaxPendigReps() > 0 && pendingReps > this.controller.getStaticConf().getPreferredPendigReps()) ||
(this.controller.getStaticConf().getMaxUsedMemory() > 0 && usedMemory > this.controller.getStaticConf().getPreferredUsedMemory())) {
for (TOMMessage request : requests) {
if (!clientsManager.isPending(request)) reqsFromLeader++;
}
// check if it is time to resume accepting new requests
} else if ((this.controller.getStaticConf().getMaxPendigReqs() < 0 || pendingReqs <= this.controller.getStaticConf().getPreferredPendigReqs()) &&
(this.controller.getStaticConf().getMaxPendigDecs() < 0 || pendingDecs <= this.controller.getStaticConf().getPreferredPendigDecs()) &&
(this.controller.getStaticConf().getMaxPendigReps() < 0 || pendingReps <= this.controller.getStaticConf().getPreferredPendigReps()) &&
(this.controller.getStaticConf().getMaxUsedMemory() < 0 || usedMemory <= this.controller.getStaticConf().getPreferredUsedMemory()))
{
ignore = false;
reqsFromLeader = 0;
}
} else if (this.controller.getStaticConf().getControlFlow()) { //check if it is time to stop accepting new requests
if ((this.controller.getStaticConf().getMaxPendigReqs() > 0 && pendingReqs >= this.controller.getStaticConf().getMaxPendigReqs()) ||
(this.controller.getStaticConf().getMaxPendigDecs() > 0 && pendingDecs >= this.controller.getStaticConf().getMaxPendigDecs()) ||
(this.controller.getStaticConf().getMaxPendigReps() > 0 && pendingReps >= this.controller.getStaticConf().getMaxPendigReps()) ||
(this.controller.getStaticConf().getMaxUsedMemory() > 0 && usedMemory >= this.controller.getStaticConf().getMaxUsedMemory()))
{
if(!ignore &&
usedMemory > this.controller.getStaticConf().getPreferredUsedMemory()) Runtime.getRuntime().gc(); // force garbage collection
ignore = true;
reqsFromLeader = 0;
}
}
//use parallelization to validate the request
final CountDownLatch latch = new CountDownLatch(requests.length);
......
......@@ -115,6 +115,10 @@ public abstract class TOMSender implements ReplyReceiver, Closeable, AutoCloseab
return opCounter.getAndIncrement();
}
public void TOMulticast(int[] targets, TOMMessage sm) {
cs.send(useSignatures, targets, sm);
}
public void TOMulticast(TOMMessage sm) {
cs.send(useSignatures, this.viewController.getCurrentViewProcesses(), sm);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册