From 902f7370fd2e945b7b0a9f36dfdb6bfbcdfb5bbb Mon Sep 17 00:00:00 2001 From: "liquidsnake@sapo.pt" Date: Mon, 22 Feb 2010 19:50:27 +0000 Subject: [PATCH] In this commit, I continued to work on the state transfer protocol. It is already capable of transfering leader information, and supply it to the delayed replica. This means the code is no longer hacked to always assume replica 0 is the leader. It also does garbage collection again. I also made some minor adjustements to some debbuging messages and the new demo --- .../ServerCommunicationSystem.java | 6 ++++-- .../executionmanager/ExecutionManager.java | 20 +++++++++---------- .../executionmanager/LeaderModule.java | 20 ++++--------------- .../smart/tom/core/DeliveryThread.java | 6 +++++- .../smart/tom/demo/RandomServer.java | 2 +- .../smart/tom/util/TOMConfiguration.java | 14 +++++++++++++ 6 files changed, 38 insertions(+), 30 deletions(-) diff --git a/src/navigators/smart/communication/ServerCommunicationSystem.java b/src/navigators/smart/communication/ServerCommunicationSystem.java index 77710d2a..b65e81e6 100644 --- a/src/navigators/smart/communication/ServerCommunicationSystem.java +++ b/src/navigators/smart/communication/ServerCommunicationSystem.java @@ -47,9 +47,9 @@ public class ServerCommunicationSystem extends Thread { public static int RR_MSG = 4; public static int RT_MSG = 5; - public static int IN_QUEUE_SIZE = 200; + //public static int IN_QUEUE_SIZE = 200; - private LinkedBlockingQueue inQueue = new LinkedBlockingQueue(IN_QUEUE_SIZE); + private LinkedBlockingQueue inQueue = null;//new LinkedBlockingQueue(IN_QUEUE_SIZE); protected MessageHandler messageHandler = new MessageHandler(); @@ -62,6 +62,8 @@ public class ServerCommunicationSystem extends Thread { public ServerCommunicationSystem(TOMConfiguration conf) throws Exception { super("Server CS"); + inQueue = new LinkedBlockingQueue(conf.inQueueSize()); + //create a new conf, with updated port number for servers TOMConfiguration serversConf = new TOMConfiguration(conf.getProcessId(), Configuration.getHomeDir(), "hosts.config"); diff --git a/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java b/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java index 32c07cea..b9026ba4 100644 --- a/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java +++ b/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java @@ -207,32 +207,32 @@ public final class ExecutionManager { int msgType = msg.getPaxosType(); boolean isRetrievingState = tomLayer.isRetrievingState(); - System.out.println("Esta a obter estado? " + isRetrievingState); - System.out.print("Recebi uma mensagem do eid " + consId + " do tipo "); - + String type = null; switch (msgType) { case MessageFactory.PROPOSE: - System.out.println("PROPOSE"); + type = "PROPOSE"; break; case MessageFactory.WEAK: - System.out.println("WEAK"); + type = "WEAK"; break; case MessageFactory.STRONG: - System.out.println("STRONG"); + type = "STRONG"; break; case MessageFactory.DECIDE: - System.out.println("DECIDE"); + type = "DECIDE"; break; case MessageFactory.FREEZE: - System.out.println("FREEZE"); + type = "FREEZE"; break; case MessageFactory.COLLECT: - System.out.println("COLLECT"); + type = "COLLECT"; break; default: - System.out.println(); + type = ""; break; } + System.out.println("Esta a obter estado? " + isRetrievingState); + Logger.println("Recebi uma mensagem do eid " + consId + " do tipo " + type); boolean canProcessTheMessage = false; diff --git a/src/navigators/smart/paxosatwar/executionmanager/LeaderModule.java b/src/navigators/smart/paxosatwar/executionmanager/LeaderModule.java index 32b4091d..34bf42c3 100644 --- a/src/navigators/smart/paxosatwar/executionmanager/LeaderModule.java +++ b/src/navigators/smart/paxosatwar/executionmanager/LeaderModule.java @@ -102,7 +102,7 @@ public class LeaderModule { * @return The replica ID of the leader */ public int getLeader(int c, int r) { - /*** + /***/ List list = leaderInfos.get(c); if (list == null) { //there are no information for the execution c @@ -128,7 +128,7 @@ public class LeaderModule { } return -1; /***/ - return 0; + //return 0; } /** @@ -198,21 +198,9 @@ public class LeaderModule { } } - for (int c = cStart; c < cEnd; c++) { - - //List list = leaderInfos.get(c + 1); - - if (list == null) {//nunca vai acontecer isso!!! - //System.err.println("- Executing a code that wasn't supposed to be executed :-)"); - //System.err.println("- And we have some reports there is a bug here!"); - list = new LinkedList(); - leaderInfos.put(c + 1, list); - List rm = leaderInfos.remove(c); - ConsInfo ci = (ConsInfo) rm.get(rm.size() - 1); - list.add(new ConsInfo(0, ci.leaderId)); - } else { + for (int c = cStart; c <= cEnd; c++) { + leaderInfos.remove(c); - } } diff --git a/src/navigators/smart/tom/core/DeliveryThread.java b/src/navigators/smart/tom/core/DeliveryThread.java index c106e193..1796015f 100644 --- a/src/navigators/smart/tom/core/DeliveryThread.java +++ b/src/navigators/smart/tom/core/DeliveryThread.java @@ -98,6 +98,8 @@ public class DeliveryThread extends Thread { receiver.setState(state.getState()); + tomLayer.lm.addLeaderInfo(state.getLastCheckpointEid(), state.getLastCheckpointRound(), state.getLastCheckpointLeader()); + int lastCheckpointEid = state.getLastCheckpointEid(); int lastEid = state.getLastEid(); @@ -107,6 +109,8 @@ public class DeliveryThread extends Thread { byte[] batch = state.getMessageBatch(eid).batch; // take a batch + tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round, state.getMessageBatch(eid).leader); + // obtain an array of requests from the taken consensus BatchReader batchReader = new BatchReader(batch, conf.getUseSignatures()==1); @@ -327,7 +331,7 @@ public class DeliveryThread extends Thread { //define the last stable consensus... the stable consensus can //be removed from the leaderManager and the executionManager - /** + /**/ if (cons.getId() > 2) { int stableConsensus = cons.getId() - 3; diff --git a/src/navigators/smart/tom/demo/RandomServer.java b/src/navigators/smart/tom/demo/RandomServer.java index a4535aaf..b5e70126 100644 --- a/src/navigators/smart/tom/demo/RandomServer.java +++ b/src/navigators/smart/tom/demo/RandomServer.java @@ -92,7 +92,7 @@ public class RandomServer extends ServiceReplica { System.exit(-1); } - new CounterServer(Integer.parseInt(args[0])); + new RandomServer(Integer.parseInt(args[0])); } @Override diff --git a/src/navigators/smart/tom/util/TOMConfiguration.java b/src/navigators/smart/tom/util/TOMConfiguration.java index e8876ee1..67f35a81 100644 --- a/src/navigators/smart/tom/util/TOMConfiguration.java +++ b/src/navigators/smart/tom/util/TOMConfiguration.java @@ -32,6 +32,7 @@ public class TOMConfiguration extends Configuration { protected int replyVerificationTime; protected int maxBatchSize; protected int numberOfNonces; + protected int inQueueSize; protected boolean decideMessagesEnabled; protected boolean verifyTimestamps; protected boolean useSenderThread; @@ -69,6 +70,7 @@ public class TOMConfiguration extends Configuration { this.useSignatures = conf.useSignatures; this.checkpoint_period = conf.checkpoint_period; this.useControlFlow = conf.useControlFlow; + this.inQueueSize = inQueueSize; } /** Creates a new instance of TOMConfiguration */ @@ -239,6 +241,13 @@ public class TOMConfiguration extends Configuration { useControlFlow = Integer.parseInt(s); } + s = (String) configs.remove("system.communication.inQueueSize"); + if (s == null) { + inQueueSize = 200; + } else { + inQueueSize = Integer.parseInt(s); + } + rsaLoader = new RSAKeyLoader(this, TOMConfiguration.configHome); } catch (Exception e) { @@ -292,6 +301,11 @@ public class TOMConfiguration extends Configuration { return verifyTimestamps; } + public int inQueueSize() { + return inQueueSize; + } + + public boolean isUseSenderThread() { return useSenderThread; } -- GitLab