diff --git a/src/navigators/smart/communication/ServerCommunicationSystem.java b/src/navigators/smart/communication/ServerCommunicationSystem.java index 77710d2aa285c0c76bd83ed10792c4bbb55bb888..b65e81e63d64234fa73fe6a7850444a2ba7d2279 100644 --- a/src/navigators/smart/communication/ServerCommunicationSystem.java +++ b/src/navigators/smart/communication/ServerCommunicationSystem.java @@ -47,9 +47,9 @@ public class ServerCommunicationSystem extends Thread { public static int RR_MSG = 4; public static int RT_MSG = 5; - public static int IN_QUEUE_SIZE = 200; + //public static int IN_QUEUE_SIZE = 200; - private LinkedBlockingQueue inQueue = new LinkedBlockingQueue(IN_QUEUE_SIZE); + private LinkedBlockingQueue inQueue = null;//new LinkedBlockingQueue(IN_QUEUE_SIZE); protected MessageHandler messageHandler = new MessageHandler(); @@ -62,6 +62,8 @@ public class ServerCommunicationSystem extends Thread { public ServerCommunicationSystem(TOMConfiguration conf) throws Exception { super("Server CS"); + inQueue = new LinkedBlockingQueue(conf.inQueueSize()); + //create a new conf, with updated port number for servers TOMConfiguration serversConf = new TOMConfiguration(conf.getProcessId(), Configuration.getHomeDir(), "hosts.config"); diff --git a/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java b/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java index 32c07cea553a3e36e99d87dedb76e6a2163f109a..b9026ba490d47959bee7559f107a29dc07760b5f 100644 --- a/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java +++ b/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java @@ -207,32 +207,32 @@ public final class ExecutionManager { int msgType = msg.getPaxosType(); boolean isRetrievingState = tomLayer.isRetrievingState(); - System.out.println("Esta a obter estado? " + isRetrievingState); - System.out.print("Recebi uma mensagem do eid " + consId + " do tipo "); - + String type = null; switch (msgType) { case MessageFactory.PROPOSE: - System.out.println("PROPOSE"); + type = "PROPOSE"; break; case MessageFactory.WEAK: - System.out.println("WEAK"); + type = "WEAK"; break; case MessageFactory.STRONG: - System.out.println("STRONG"); + type = "STRONG"; break; case MessageFactory.DECIDE: - System.out.println("DECIDE"); + type = "DECIDE"; break; case MessageFactory.FREEZE: - System.out.println("FREEZE"); + type = "FREEZE"; break; case MessageFactory.COLLECT: - System.out.println("COLLECT"); + type = "COLLECT"; break; default: - System.out.println(); + type = ""; break; } + System.out.println("Esta a obter estado? " + isRetrievingState); + Logger.println("Recebi uma mensagem do eid " + consId + " do tipo " + type); boolean canProcessTheMessage = false; diff --git a/src/navigators/smart/paxosatwar/executionmanager/LeaderModule.java b/src/navigators/smart/paxosatwar/executionmanager/LeaderModule.java index 32b4091da9518c47183eb0f9d3760829d84c7f55..34bf42c3ce3a5746bb9b2fdde152511aba34864a 100644 --- a/src/navigators/smart/paxosatwar/executionmanager/LeaderModule.java +++ b/src/navigators/smart/paxosatwar/executionmanager/LeaderModule.java @@ -102,7 +102,7 @@ public class LeaderModule { * @return The replica ID of the leader */ public int getLeader(int c, int r) { - /*** + /***/ List list = leaderInfos.get(c); if (list == null) { //there are no information for the execution c @@ -128,7 +128,7 @@ public class LeaderModule { } return -1; /***/ - return 0; + //return 0; } /** @@ -198,21 +198,9 @@ public class LeaderModule { } } - for (int c = cStart; c < cEnd; c++) { - - //List list = leaderInfos.get(c + 1); - - if (list == null) {//nunca vai acontecer isso!!! - //System.err.println("- Executing a code that wasn't supposed to be executed :-)"); - //System.err.println("- And we have some reports there is a bug here!"); - list = new LinkedList(); - leaderInfos.put(c + 1, list); - List rm = leaderInfos.remove(c); - ConsInfo ci = (ConsInfo) rm.get(rm.size() - 1); - list.add(new ConsInfo(0, ci.leaderId)); - } else { + for (int c = cStart; c <= cEnd; c++) { + leaderInfos.remove(c); - } } diff --git a/src/navigators/smart/tom/core/DeliveryThread.java b/src/navigators/smart/tom/core/DeliveryThread.java index c106e1930aac3eb9e9ecb2a3e0759d8ff45c2af6..1796015f0eab96cedda2a01cc51f60786fa5f578 100644 --- a/src/navigators/smart/tom/core/DeliveryThread.java +++ b/src/navigators/smart/tom/core/DeliveryThread.java @@ -98,6 +98,8 @@ public class DeliveryThread extends Thread { receiver.setState(state.getState()); + tomLayer.lm.addLeaderInfo(state.getLastCheckpointEid(), state.getLastCheckpointRound(), state.getLastCheckpointLeader()); + int lastCheckpointEid = state.getLastCheckpointEid(); int lastEid = state.getLastEid(); @@ -107,6 +109,8 @@ public class DeliveryThread extends Thread { byte[] batch = state.getMessageBatch(eid).batch; // take a batch + tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round, state.getMessageBatch(eid).leader); + // obtain an array of requests from the taken consensus BatchReader batchReader = new BatchReader(batch, conf.getUseSignatures()==1); @@ -327,7 +331,7 @@ public class DeliveryThread extends Thread { //define the last stable consensus... the stable consensus can //be removed from the leaderManager and the executionManager - /** + /**/ if (cons.getId() > 2) { int stableConsensus = cons.getId() - 3; diff --git a/src/navigators/smart/tom/demo/RandomServer.java b/src/navigators/smart/tom/demo/RandomServer.java index a4535aaf50ebb331b8d538d33fbd85cf03df0398..b5e701268e3344f75a9f928271f75e7a7b7a21ea 100644 --- a/src/navigators/smart/tom/demo/RandomServer.java +++ b/src/navigators/smart/tom/demo/RandomServer.java @@ -92,7 +92,7 @@ public class RandomServer extends ServiceReplica { System.exit(-1); } - new CounterServer(Integer.parseInt(args[0])); + new RandomServer(Integer.parseInt(args[0])); } @Override diff --git a/src/navigators/smart/tom/util/TOMConfiguration.java b/src/navigators/smart/tom/util/TOMConfiguration.java index e8876ee17d7148193b688d25930db2af9f29daa0..67f35a815bb4cc158627e19eabe04724d138ed36 100644 --- a/src/navigators/smart/tom/util/TOMConfiguration.java +++ b/src/navigators/smart/tom/util/TOMConfiguration.java @@ -32,6 +32,7 @@ public class TOMConfiguration extends Configuration { protected int replyVerificationTime; protected int maxBatchSize; protected int numberOfNonces; + protected int inQueueSize; protected boolean decideMessagesEnabled; protected boolean verifyTimestamps; protected boolean useSenderThread; @@ -69,6 +70,7 @@ public class TOMConfiguration extends Configuration { this.useSignatures = conf.useSignatures; this.checkpoint_period = conf.checkpoint_period; this.useControlFlow = conf.useControlFlow; + this.inQueueSize = inQueueSize; } /** Creates a new instance of TOMConfiguration */ @@ -239,6 +241,13 @@ public class TOMConfiguration extends Configuration { useControlFlow = Integer.parseInt(s); } + s = (String) configs.remove("system.communication.inQueueSize"); + if (s == null) { + inQueueSize = 200; + } else { + inQueueSize = Integer.parseInt(s); + } + rsaLoader = new RSAKeyLoader(this, TOMConfiguration.configHome); } catch (Exception e) { @@ -292,6 +301,11 @@ public class TOMConfiguration extends Configuration { return verifyTimestamps; } + public int inQueueSize() { + return inQueueSize; + } + + public boolean isUseSenderThread() { return useSenderThread; }