diff --git a/bin/BFT-SMaRt.jar b/bin/BFT-SMaRt.jar index 45ac0f1315ea016ea484ebe523dd9ce00a9de495..3d30d8fa9eecf4190265626f00020b4039aabf5b 100644 Binary files a/bin/BFT-SMaRt.jar and b/bin/BFT-SMaRt.jar differ diff --git a/config/system.config b/config/system.config index 428be70c27486a5435b181a758b6b167474fba13..802876415c58bd6230cde94cdcbdfc151c184494 100644 --- a/config/system.config +++ b/config/system.config @@ -64,7 +64,7 @@ system.servers.num = 4 system.servers.f = 1 #Timeout to asking for a client request. This value should be greater than the batch timeout -system.totalordermulticast.timeout = 2000 +system.totalordermulticast.timeout = 20000000 #Batch timeout. If set to any non-positive integer value, the next consensus instance #is triggered as soon as (1) the previous one is finished, and (2) any number new requests arrive, @@ -115,29 +115,32 @@ system.totalordermulticast.invoketimeout = 40000 ######### Control Flow Mechanism ########### ############################################ -#Maximum number of total pending requests that each replica can hold. Set to -1 to disable control flow -system.totalordermulticast.maxpendingreqs = 500000 +#Enable/disable control flow mechanism +system.controlflow.controlflow = true + +#Maximum number of total pending requests that each replica can hold. Set to -1 to disable. +system.controlflow.maxpendingreqs = 5 #Preferred number of total pending requests. Once each replica hold up to this many requests, #it accepts requests again from clients. It must be lower than system.totalordermulticast.maxpendingreqs -system.totalordermulticast.preferredpendingreqs = 250000 +system.controlflow.preferredpendingreqs = 2 -#Maximum number of total pending decisions that each replica can hold. -system.totalordermulticast.maxpendingdecs = 200000 +#Maximum number of total pending decisions that each replica can hold. Set to -1 to disable. +system.controlflow.maxpendingdecs = 200 #Preferred number of total pending decisions. Once each replica hold up to this many decisions, #it accepts requests again from clients. It must be lower than system.totalordermulticast.maxpendingdecs -system.totalordermulticast.preferredpendingdecs = 100000 +system.controlflow.preferredpendingdecs = 100 -#Maximum used memory that each replica should reach, in bytes. -system.totalordermulticast.maxusedmemory = 524288000 +#Maximum used memory that each replica should reach, in bytes. Set to -1 to disable. +system.controlflow.maxusedmemory = 524288000 #Preferred used memory, in bytes. Once each replica decreses its memory to this amount, #it accepts requests again from clients. It must be lower than system.totalordermulticast.maxusedmemory -system.totalordermulticast.preferredusedmemory = 262144000 +system.controlflow.preferredusedmemory = 262144000 #Timeout for the client-side retrying mechanism, in miliseconds -system.totalordermulticast.controlflowtimeout = 1000 +system.controlflow.timeout = 1000 ############################################ ###### State Transfer Configurations ####### diff --git a/src/bftsmart/clientsmanagement/ClientsManager.java b/src/bftsmart/clientsmanagement/ClientsManager.java index 3e41ce8a1f7c9be301ca75c01120920e6b0a5f78..0d8d0c587319738191cd88e411e05c985df28f8d 100644 --- a/src/bftsmart/clientsmanagement/ClientsManager.java +++ b/src/bftsmart/clientsmanagement/ClientsManager.java @@ -28,6 +28,7 @@ import bftsmart.tom.core.messages.TOMMessage; import bftsmart.tom.core.messages.TOMMessageType; import bftsmart.tom.leaderchange.RequestsTimer; import bftsmart.tom.server.RequestVerifier; +import bftsmart.tom.util.TOMUtil; import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -270,16 +271,15 @@ public class ClientsManager { public boolean requestReceived(TOMMessage request, boolean fromClient) { int pendingReqs = countPendingRequests(); - int pendingDecs = dt.getDecisionsInQueue(); + int pendingDecs = dt.getPendingDecisions(); long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); //control flow mechanism - if (fromClient) { - if (this.controller.getStaticConf().getMaxPendigReqs() > 0) { + if (fromClient && this.controller.getStaticConf().getControlFlow()) { - if (pendingReqs >= this.controller.getStaticConf().getMaxPendigReqs() || - pendingDecs >= this.controller.getStaticConf().getMaxPendigDecs() || - usedMemory >= this.controller.getStaticConf().getMaxUsedMemory()) + if ((this.controller.getStaticConf().getMaxPendigReqs() > 0 && pendingReqs >= this.controller.getStaticConf().getMaxPendigReqs()) || + (this.controller.getStaticConf().getMaxPendigDecs() > 0 && pendingDecs >= this.controller.getStaticConf().getMaxPendigDecs()) || + (this.controller.getStaticConf().getMaxUsedMemory() > 0 && usedMemory >= this.controller.getStaticConf().getMaxUsedMemory())) { ignore = true; @@ -291,11 +291,11 @@ public class ClientsManager { ignore = false; } - } if (ignore) { - if(usedMemory > this.controller.getStaticConf().getPreferredUsedMemory()) Runtime.getRuntime().gc(); // force garbage collection + if(this.controller.getStaticConf().getMaxUsedMemory() > 0 && + usedMemory > this.controller.getStaticConf().getPreferredUsedMemory()) Runtime.getRuntime().gc(); // force garbage collection logger.warn("Discarding message due to control flow mechanism\n" + "\tMaximum requests are {}, current requests at {}\n" + @@ -303,7 +303,8 @@ public class ClientsManager { "\tMaximum memory is {} current memory at {}\n", this.controller.getStaticConf().getMaxPendigReqs(), pendingReqs, this.controller.getStaticConf().getMaxPendigDecs(), pendingDecs, - this.controller.getStaticConf().getMaxUsedMemory(), usedMemory); + TOMUtil.humanReadableByteCount(this.controller.getStaticConf().getMaxUsedMemory(), false), + TOMUtil.humanReadableByteCount(usedMemory, false)); return false; } @@ -419,7 +420,7 @@ public class ClientsManager { } public void sendAck(boolean fromClient, TOMMessage request) { - if ((fromClient || !request.ackSent) && this.controller.getStaticConf().getMaxPendigReqs() > 0 && cs != null) { + if ((fromClient || !request.ackSent) && this.controller.getStaticConf().getControlFlow() && cs != null) { logger.debug("Sending ACK to client {}", request.getSender()); diff --git a/src/bftsmart/reconfiguration/util/TOMConfiguration.java b/src/bftsmart/reconfiguration/util/TOMConfiguration.java index 8fa9012f8495315bd3a1ffb7de9a0f056121bb4b..757f0e375f168b08dc33074aca5c5bae9b52c690 100644 --- a/src/bftsmart/reconfiguration/util/TOMConfiguration.java +++ b/src/bftsmart/reconfiguration/util/TOMConfiguration.java @@ -50,6 +50,7 @@ public class TOMConfiguration extends Configuration { protected int outQueueSize; protected boolean shutdownHookEnabled; protected boolean useSenderThread; + protected boolean controlFlow; private int numNIOThreads; private int useMACs; private int useSignatures; @@ -126,21 +127,11 @@ public class TOMConfiguration extends Configuration { s = (String) configs.remove("system.totalordermulticast.invoketimeout"); if (s == null) { - invokeTimeout = 1000; + invokeTimeout = 40000; } else { invokeTimeout = Integer.parseInt(s); if (invokeTimeout <= 0) { - invokeTimeout = 1000; - } - } - - s = (String) configs.remove("system.totalordermulticast.controlflowtimeout"); - if (s == null) { - controlFlowTimeout = 40000; - } else { - controlFlowTimeout = Integer.parseInt(s); - if (controlFlowTimeout <= 0) { - controlFlowTimeout = 40000; + invokeTimeout = 40000; } } @@ -181,7 +172,14 @@ public class TOMConfiguration extends Configuration { maxBatchSize = Integer.parseInt(s); } - s = (String) configs.remove("system.totalordermulticast.maxpendingreqs"); + s = (String) configs.remove("system.controlflow.controlflow"); + if (s == null) { + controlFlow = true; + } else { + controlFlow = Boolean.parseBoolean(s); + } + + s = (String) configs.remove("system.controlflow.maxpendingreqs"); if (s == null) { maxPendingReqs = 100000; } else { @@ -191,7 +189,7 @@ public class TOMConfiguration extends Configuration { } } - s = (String) configs.remove("system.totalordermulticast.preferredpendingreqs"); + s = (String) configs.remove("system.controlflow.preferredpendingreqs"); if (s == null) { preferredPendingReqs = 10000; } else { @@ -199,7 +197,7 @@ public class TOMConfiguration extends Configuration { } - s = (String) configs.remove("system.totalordermulticast.maxpendingdecs"); + s = (String) configs.remove("system.controlflow.maxpendingdecs"); if (s == null) { maxPendingDecs = 100000; } else { @@ -209,7 +207,7 @@ public class TOMConfiguration extends Configuration { } } - s = (String) configs.remove("system.totalordermulticast.preferredpendingdecs"); + s = (String) configs.remove("system.controlflow.preferredpendingdecs"); if (s == null) { preferredPendingDecs = 10000; } else { @@ -217,7 +215,7 @@ public class TOMConfiguration extends Configuration { } - s = (String) configs.remove("system.totalordermulticast.maxusedmemory"); + s = (String) configs.remove("system.controlflow.maxusedmemory"); if (s == null) { maxUsedMemory = 100000; } else { @@ -227,7 +225,7 @@ public class TOMConfiguration extends Configuration { } } - s = (String) configs.remove("system.totalordermulticast.preferredusedmemory"); + s = (String) configs.remove("system.controlflow.preferredusedmemory"); if (s == null) { preferredUsedMemory = 10000; } else { @@ -235,6 +233,16 @@ public class TOMConfiguration extends Configuration { } + s = (String) configs.remove("system.controlflow.timeout"); + if (s == null) { + controlFlowTimeout = 1000; + } else { + controlFlowTimeout = Integer.parseInt(s); + if (controlFlowTimeout <= 0) { + controlFlowTimeout = 1000; + } + } + s = (String) configs.remove("system.totalordermulticast.replayVerificationTime"); if (s == null) { replyVerificationTime = 0; @@ -255,7 +263,7 @@ public class TOMConfiguration extends Configuration { } else { useSenderThread = Boolean.parseBoolean(s); } - + s = (String) configs.remove("system.communication.numNIOThreads"); if (s == null) { numNIOThreads = 2; @@ -577,31 +585,35 @@ public class TOMConfiguration extends Configuration { public int getCheckpointPeriod() { return checkpointPeriod; } + + public boolean getControlFlow() { + return controlFlow; + } - public boolean isToWriteCkpsToDisk() { - return isToWriteCkpsToDisk; - } - - public boolean isToWriteSyncCkp() { - return syncCkp; - } + public boolean isToWriteCkpsToDisk() { + return isToWriteCkpsToDisk; + } - public boolean isToLog() { - return isToLog; - } + public boolean isToWriteSyncCkp() { + return syncCkp; + } - public boolean isToWriteSyncLog() { - return syncLog; - } + public boolean isToLog() { + return isToLog; + } - public boolean logToDisk() { - return logToDisk; - } + public boolean isToWriteSyncLog() { + return syncLog; + } - public boolean isToLogParallel() { - // TODO Auto-generated method stub - return parallelLog; - } + public boolean logToDisk() { + return logToDisk; + } + + public boolean isToLogParallel() { + // TODO Auto-generated method stub + return parallelLog; + } /** * Indicates the checkpoint period used when fetching the state from the application diff --git a/src/bftsmart/tom/AsynchServiceProxy.java b/src/bftsmart/tom/AsynchServiceProxy.java index 358bd2fd39103d16e92f69e8385030d36a76e0bb..5b68892824880d3af7f9c6992a1642593fa99564 100644 --- a/src/bftsmart/tom/AsynchServiceProxy.java +++ b/src/bftsmart/tom/AsynchServiceProxy.java @@ -16,7 +16,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +39,7 @@ public class AsynchServiceProxy extends ServiceProxy { private Thread cleanerThread; private LinkedBlockingQueue invokeQueue; private Thread invokeThread; + private boolean doWork = true; /** * Constructor @@ -101,9 +101,12 @@ public class AsynchServiceProxy extends ServiceProxy { public void run() { - while (true) { + while (doWork) { try { + + if (cleanQueue.poll(200, TimeUnit.MILLISECONDS) == null) continue; + int requestId = cleanQueue.take(); Integer id = requestId; @@ -146,12 +149,16 @@ public class AsynchServiceProxy extends ServiceProxy { public void run() { - while(true) { + while(doWork) { try { - RequestContext requestContext = invokeQueue.take(); + RequestContext requestContext = null; + + requestContext = invokeQueue.poll(200, TimeUnit.MILLISECONDS); - logger.debug("Dequeing invoke at client {} for request #{}", getViewManager().getStaticConf().getProcessId(), requestContext.getOperationId()); + if (requestContext == null) continue; + + logger.debug("Dequeued invoke at client {} for request #{}", getViewManager().getStaticConf().getProcessId(), requestContext.getOperationId()); invokeAsynch(requestContext); @@ -264,6 +271,11 @@ public class AsynchServiceProxy extends ServiceProxy { } + @Override + public void close() { + doWork = false; + super.close(); + } /** * This is the method invoked by the client side communication system. * @@ -459,7 +471,8 @@ public class AsynchServiceProxy extends ServiceProxy { TOMulticast(sm); - if (!reqCtx.getDoS() && getViewManager().getStaticConf().getMaxPendigReqs() > 0) { + //Control flow + if (!reqCtx.getDoS() && getViewManager().getStaticConf().getControlFlow()) { while (true) { if (this.controlFlow.tryAcquire(getViewManager().getStaticConf().getControlFlowTimeout(), TimeUnit.MILLISECONDS)) { diff --git a/src/bftsmart/tom/ServiceProxy.java b/src/bftsmart/tom/ServiceProxy.java index feb52dec3b416c9617b8abc9990e02586603b638..abf64c53ede4281093d059f86f3c563836cc0e93 100644 --- a/src/bftsmart/tom/ServiceProxy.java +++ b/src/bftsmart/tom/ServiceProxy.java @@ -266,7 +266,9 @@ public class ServiceProxy extends TOMSender { } }else{ - if (getViewManager().getStaticConf().getMaxPendigReqs() > 0) { + //Control flow + if (getViewManager().getStaticConf().getControlFlow()) { + while (true) { if (this.controlFlow.tryAcquire(getViewManager().getStaticConf().getControlFlowTimeout(), TimeUnit.MILLISECONDS)) { diff --git a/src/bftsmart/tom/core/DeliveryThread.java b/src/bftsmart/tom/core/DeliveryThread.java index 11e1b7edbf7781ea5532c66e2dd7d9a89096dc45..88f4a9d091d9ba93bd40ace66d4bff80ad7b81aa 100644 --- a/src/bftsmart/tom/core/DeliveryThread.java +++ b/src/bftsmart/tom/core/DeliveryThread.java @@ -46,6 +46,7 @@ public final class DeliveryThread extends Thread { private boolean doWork = true; private int lastReconfig = -2; + private int currentDecisions = 0; private final LinkedBlockingQueue decided; private final TOMLayer tomLayer; // TOM layer private final ServiceReplica receiver; // Object that receives requests from clients @@ -76,8 +77,8 @@ public final class DeliveryThread extends Thread { return recoverer; } - public int getDecisionsInQueue() { - return decided.size(); + public int getPendingDecisions() { + return decided.size() + currentDecisions; } /** @@ -181,6 +182,9 @@ public final class DeliveryThread extends Thread { @Override public void run() { while (doWork) { + + currentDecisions = 0; + /** THIS IS JOAO'S CODE, TO HANDLE STATE TRANSFER */ deliverLock(); while (tomLayer.isRetrievingState()) { @@ -210,6 +214,9 @@ public final class DeliveryThread extends Thread { if (!doWork) break; if (decisions.size() > 0) { + + currentDecisions = decisions.size(); + TOMMessage[][] requests = new TOMMessage[decisions.size()][]; int[] consensusIds = new int[requests.length]; int[] leadersIds = new int[requests.length]; diff --git a/src/bftsmart/tom/util/TOMUtil.java b/src/bftsmart/tom/util/TOMUtil.java index 7e164899fd47cd1c2aa485144acb00467e66b9e2..96b4caf6c4d6d1c8f349cca4d2158f7928873e73 100644 --- a/src/bftsmart/tom/util/TOMUtil.java +++ b/src/bftsmart/tom/util/TOMUtil.java @@ -95,6 +95,14 @@ public class TOMUtil { } } + public static String humanReadableByteCount(long bytes, boolean si) { + int unit = si ? 1000 : 1024; + if (bytes < unit) return bytes + " B"; + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp-1) + (si ? "" : "i"); + return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); + } + //******* EDUARDO BEGIN **************// public static byte[] getBytes(Object o) { ByteArrayOutputStream bOut = new ByteArrayOutputStream();