提交 bc6cb243 编写于 作者: J João Sousa

Made more refinements to control flow

上级 7af56c44
无法预览此类型文件
......@@ -64,7 +64,7 @@ system.servers.num = 4
system.servers.f = 1
#Timeout to asking for a client request. This value should be greater than the batch timeout
system.totalordermulticast.timeout = 2000
system.totalordermulticast.timeout = 20000000
#Batch timeout. If set to any non-positive integer value, the next consensus instance
#is triggered as soon as (1) the previous one is finished, and (2) any number new requests arrive,
......@@ -115,29 +115,32 @@ system.totalordermulticast.invoketimeout = 40000
######### Control Flow Mechanism ###########
############################################
#Maximum number of total pending requests that each replica can hold. Set to -1 to disable control flow
system.totalordermulticast.maxpendingreqs = 500000
#Enable/disable control flow mechanism
system.controlflow.controlflow = true
#Maximum number of total pending requests that each replica can hold. Set to -1 to disable.
system.controlflow.maxpendingreqs = 5
#Preferred number of total pending requests. Once each replica hold up to this many requests,
#it accepts requests again from clients. It must be lower than system.totalordermulticast.maxpendingreqs
system.totalordermulticast.preferredpendingreqs = 250000
system.controlflow.preferredpendingreqs = 2
#Maximum number of total pending decisions that each replica can hold.
system.totalordermulticast.maxpendingdecs = 200000
#Maximum number of total pending decisions that each replica can hold. Set to -1 to disable.
system.controlflow.maxpendingdecs = 200
#Preferred number of total pending decisions. Once each replica hold up to this many decisions,
#it accepts requests again from clients. It must be lower than system.totalordermulticast.maxpendingdecs
system.totalordermulticast.preferredpendingdecs = 100000
system.controlflow.preferredpendingdecs = 100
#Maximum used memory that each replica should reach, in bytes.
system.totalordermulticast.maxusedmemory = 524288000
#Maximum used memory that each replica should reach, in bytes. Set to -1 to disable.
system.controlflow.maxusedmemory = 524288000
#Preferred used memory, in bytes. Once each replica decreses its memory to this amount,
#it accepts requests again from clients. It must be lower than system.totalordermulticast.maxusedmemory
system.totalordermulticast.preferredusedmemory = 262144000
system.controlflow.preferredusedmemory = 262144000
#Timeout for the client-side retrying mechanism, in miliseconds
system.totalordermulticast.controlflowtimeout = 1000
system.controlflow.timeout = 1000
############################################
###### State Transfer Configurations #######
......
......@@ -28,6 +28,7 @@ import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.core.messages.TOMMessageType;
import bftsmart.tom.leaderchange.RequestsTimer;
import bftsmart.tom.server.RequestVerifier;
import bftsmart.tom.util.TOMUtil;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -270,16 +271,15 @@ public class ClientsManager {
public boolean requestReceived(TOMMessage request, boolean fromClient) {
int pendingReqs = countPendingRequests();
int pendingDecs = dt.getDecisionsInQueue();
int pendingDecs = dt.getPendingDecisions();
long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
//control flow mechanism
if (fromClient) {
if (this.controller.getStaticConf().getMaxPendigReqs() > 0) {
if (fromClient && this.controller.getStaticConf().getControlFlow()) {
if (pendingReqs >= this.controller.getStaticConf().getMaxPendigReqs() ||
pendingDecs >= this.controller.getStaticConf().getMaxPendigDecs() ||
usedMemory >= this.controller.getStaticConf().getMaxUsedMemory())
if ((this.controller.getStaticConf().getMaxPendigReqs() > 0 && pendingReqs >= this.controller.getStaticConf().getMaxPendigReqs()) ||
(this.controller.getStaticConf().getMaxPendigDecs() > 0 && pendingDecs >= this.controller.getStaticConf().getMaxPendigDecs()) ||
(this.controller.getStaticConf().getMaxUsedMemory() > 0 && usedMemory >= this.controller.getStaticConf().getMaxUsedMemory()))
{
ignore = true;
......@@ -291,11 +291,11 @@ public class ClientsManager {
ignore = false;
}
}
if (ignore) {
if(usedMemory > this.controller.getStaticConf().getPreferredUsedMemory()) Runtime.getRuntime().gc(); // force garbage collection
if(this.controller.getStaticConf().getMaxUsedMemory() > 0 &&
usedMemory > this.controller.getStaticConf().getPreferredUsedMemory()) Runtime.getRuntime().gc(); // force garbage collection
logger.warn("Discarding message due to control flow mechanism\n" +
"\tMaximum requests are {}, current requests at {}\n" +
......@@ -303,7 +303,8 @@ public class ClientsManager {
"\tMaximum memory is {} current memory at {}\n",
this.controller.getStaticConf().getMaxPendigReqs(), pendingReqs,
this.controller.getStaticConf().getMaxPendigDecs(), pendingDecs,
this.controller.getStaticConf().getMaxUsedMemory(), usedMemory);
TOMUtil.humanReadableByteCount(this.controller.getStaticConf().getMaxUsedMemory(), false),
TOMUtil.humanReadableByteCount(usedMemory, false));
return false;
}
......@@ -419,7 +420,7 @@ public class ClientsManager {
}
public void sendAck(boolean fromClient, TOMMessage request) {
if ((fromClient || !request.ackSent) && this.controller.getStaticConf().getMaxPendigReqs() > 0 && cs != null) {
if ((fromClient || !request.ackSent) && this.controller.getStaticConf().getControlFlow() && cs != null) {
logger.debug("Sending ACK to client {}", request.getSender());
......
......@@ -50,6 +50,7 @@ public class TOMConfiguration extends Configuration {
protected int outQueueSize;
protected boolean shutdownHookEnabled;
protected boolean useSenderThread;
protected boolean controlFlow;
private int numNIOThreads;
private int useMACs;
private int useSignatures;
......@@ -126,21 +127,11 @@ public class TOMConfiguration extends Configuration {
s = (String) configs.remove("system.totalordermulticast.invoketimeout");
if (s == null) {
invokeTimeout = 1000;
invokeTimeout = 40000;
} else {
invokeTimeout = Integer.parseInt(s);
if (invokeTimeout <= 0) {
invokeTimeout = 1000;
}
}
s = (String) configs.remove("system.totalordermulticast.controlflowtimeout");
if (s == null) {
controlFlowTimeout = 40000;
} else {
controlFlowTimeout = Integer.parseInt(s);
if (controlFlowTimeout <= 0) {
controlFlowTimeout = 40000;
invokeTimeout = 40000;
}
}
......@@ -181,7 +172,14 @@ public class TOMConfiguration extends Configuration {
maxBatchSize = Integer.parseInt(s);
}
s = (String) configs.remove("system.totalordermulticast.maxpendingreqs");
s = (String) configs.remove("system.controlflow.controlflow");
if (s == null) {
controlFlow = true;
} else {
controlFlow = Boolean.parseBoolean(s);
}
s = (String) configs.remove("system.controlflow.maxpendingreqs");
if (s == null) {
maxPendingReqs = 100000;
} else {
......@@ -191,7 +189,7 @@ public class TOMConfiguration extends Configuration {
}
}
s = (String) configs.remove("system.totalordermulticast.preferredpendingreqs");
s = (String) configs.remove("system.controlflow.preferredpendingreqs");
if (s == null) {
preferredPendingReqs = 10000;
} else {
......@@ -199,7 +197,7 @@ public class TOMConfiguration extends Configuration {
}
s = (String) configs.remove("system.totalordermulticast.maxpendingdecs");
s = (String) configs.remove("system.controlflow.maxpendingdecs");
if (s == null) {
maxPendingDecs = 100000;
} else {
......@@ -209,7 +207,7 @@ public class TOMConfiguration extends Configuration {
}
}
s = (String) configs.remove("system.totalordermulticast.preferredpendingdecs");
s = (String) configs.remove("system.controlflow.preferredpendingdecs");
if (s == null) {
preferredPendingDecs = 10000;
} else {
......@@ -217,7 +215,7 @@ public class TOMConfiguration extends Configuration {
}
s = (String) configs.remove("system.totalordermulticast.maxusedmemory");
s = (String) configs.remove("system.controlflow.maxusedmemory");
if (s == null) {
maxUsedMemory = 100000;
} else {
......@@ -227,7 +225,7 @@ public class TOMConfiguration extends Configuration {
}
}
s = (String) configs.remove("system.totalordermulticast.preferredusedmemory");
s = (String) configs.remove("system.controlflow.preferredusedmemory");
if (s == null) {
preferredUsedMemory = 10000;
} else {
......@@ -235,6 +233,16 @@ public class TOMConfiguration extends Configuration {
}
s = (String) configs.remove("system.controlflow.timeout");
if (s == null) {
controlFlowTimeout = 1000;
} else {
controlFlowTimeout = Integer.parseInt(s);
if (controlFlowTimeout <= 0) {
controlFlowTimeout = 1000;
}
}
s = (String) configs.remove("system.totalordermulticast.replayVerificationTime");
if (s == null) {
replyVerificationTime = 0;
......@@ -255,7 +263,7 @@ public class TOMConfiguration extends Configuration {
} else {
useSenderThread = Boolean.parseBoolean(s);
}
s = (String) configs.remove("system.communication.numNIOThreads");
if (s == null) {
numNIOThreads = 2;
......@@ -577,31 +585,35 @@ public class TOMConfiguration extends Configuration {
public int getCheckpointPeriod() {
return checkpointPeriod;
}
public boolean getControlFlow() {
return controlFlow;
}
public boolean isToWriteCkpsToDisk() {
return isToWriteCkpsToDisk;
}
public boolean isToWriteSyncCkp() {
return syncCkp;
}
public boolean isToWriteCkpsToDisk() {
return isToWriteCkpsToDisk;
}
public boolean isToLog() {
return isToLog;
}
public boolean isToWriteSyncCkp() {
return syncCkp;
}
public boolean isToWriteSyncLog() {
return syncLog;
}
public boolean isToLog() {
return isToLog;
}
public boolean logToDisk() {
return logToDisk;
}
public boolean isToWriteSyncLog() {
return syncLog;
}
public boolean isToLogParallel() {
// TODO Auto-generated method stub
return parallelLog;
}
public boolean logToDisk() {
return logToDisk;
}
public boolean isToLogParallel() {
// TODO Auto-generated method stub
return parallelLog;
}
/**
* Indicates the checkpoint period used when fetching the state from the application
......
......@@ -16,7 +16,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -40,6 +39,7 @@ public class AsynchServiceProxy extends ServiceProxy {
private Thread cleanerThread;
private LinkedBlockingQueue<RequestContext> invokeQueue;
private Thread invokeThread;
private boolean doWork = true;
/**
* Constructor
......@@ -101,9 +101,12 @@ public class AsynchServiceProxy extends ServiceProxy {
public void run() {
while (true) {
while (doWork) {
try {
if (cleanQueue.poll(200, TimeUnit.MILLISECONDS) == null) continue;
int requestId = cleanQueue.take();
Integer id = requestId;
......@@ -146,12 +149,16 @@ public class AsynchServiceProxy extends ServiceProxy {
public void run() {
while(true) {
while(doWork) {
try {
RequestContext requestContext = invokeQueue.take();
RequestContext requestContext = null;
requestContext = invokeQueue.poll(200, TimeUnit.MILLISECONDS);
logger.debug("Dequeing invoke at client {} for request #{}", getViewManager().getStaticConf().getProcessId(), requestContext.getOperationId());
if (requestContext == null) continue;
logger.debug("Dequeued invoke at client {} for request #{}", getViewManager().getStaticConf().getProcessId(), requestContext.getOperationId());
invokeAsynch(requestContext);
......@@ -264,6 +271,11 @@ public class AsynchServiceProxy extends ServiceProxy {
}
@Override
public void close() {
doWork = false;
super.close();
}
/**
* This is the method invoked by the client side communication system.
*
......@@ -459,7 +471,8 @@ public class AsynchServiceProxy extends ServiceProxy {
TOMulticast(sm);
if (!reqCtx.getDoS() && getViewManager().getStaticConf().getMaxPendigReqs() > 0) {
//Control flow
if (!reqCtx.getDoS() && getViewManager().getStaticConf().getControlFlow()) {
while (true) {
if (this.controlFlow.tryAcquire(getViewManager().getStaticConf().getControlFlowTimeout(), TimeUnit.MILLISECONDS)) {
......
......@@ -266,7 +266,9 @@ public class ServiceProxy extends TOMSender {
}
}else{
if (getViewManager().getStaticConf().getMaxPendigReqs() > 0) {
//Control flow
if (getViewManager().getStaticConf().getControlFlow()) {
while (true) {
if (this.controlFlow.tryAcquire(getViewManager().getStaticConf().getControlFlowTimeout(), TimeUnit.MILLISECONDS)) {
......
......@@ -46,6 +46,7 @@ public final class DeliveryThread extends Thread {
private boolean doWork = true;
private int lastReconfig = -2;
private int currentDecisions = 0;
private final LinkedBlockingQueue<Decision> decided;
private final TOMLayer tomLayer; // TOM layer
private final ServiceReplica receiver; // Object that receives requests from clients
......@@ -76,8 +77,8 @@ public final class DeliveryThread extends Thread {
return recoverer;
}
public int getDecisionsInQueue() {
return decided.size();
public int getPendingDecisions() {
return decided.size() + currentDecisions;
}
/**
......@@ -181,6 +182,9 @@ public final class DeliveryThread extends Thread {
@Override
public void run() {
while (doWork) {
currentDecisions = 0;
/** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */
deliverLock();
while (tomLayer.isRetrievingState()) {
......@@ -210,6 +214,9 @@ public final class DeliveryThread extends Thread {
if (!doWork) break;
if (decisions.size() > 0) {
currentDecisions = decisions.size();
TOMMessage[][] requests = new TOMMessage[decisions.size()][];
int[] consensusIds = new int[requests.length];
int[] leadersIds = new int[requests.length];
......
......@@ -95,6 +95,14 @@ public class TOMUtil {
}
}
public static String humanReadableByteCount(long bytes, boolean si) {
int unit = si ? 1000 : 1024;
if (bytes < unit) return bytes + " B";
int exp = (int) (Math.log(bytes) / Math.log(unit));
String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp-1) + (si ? "" : "i");
return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
}
//******* EDUARDO BEGIN **************//
public static byte[] getBytes(Object o) {
ByteArrayOutputStream bOut = new ByteArrayOutputStream();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册