提交 902f7370 编写于 作者: L liquidsnake@sapo.pt

In this commit, I continued to work on the state transfer protocol. It is...

In this commit, I continued to work on the state transfer protocol. It is already capable of transfering leader information, and supply it to the delayed replica. This means the code is no longer hacked to always assume replica 0 is the leader. It also does garbage collection again.

I also made some minor adjustements to some debbuging messages and the new demo
上级 e9f9ff68
...@@ -47,9 +47,9 @@ public class ServerCommunicationSystem extends Thread { ...@@ -47,9 +47,9 @@ public class ServerCommunicationSystem extends Thread {
public static int RR_MSG = 4; public static int RR_MSG = 4;
public static int RT_MSG = 5; public static int RT_MSG = 5;
public static int IN_QUEUE_SIZE = 200; //public static int IN_QUEUE_SIZE = 200;
private LinkedBlockingQueue<SystemMessage> inQueue = new LinkedBlockingQueue<SystemMessage>(IN_QUEUE_SIZE); private LinkedBlockingQueue<SystemMessage> inQueue = null;//new LinkedBlockingQueue<SystemMessage>(IN_QUEUE_SIZE);
protected MessageHandler messageHandler = new MessageHandler(); protected MessageHandler messageHandler = new MessageHandler();
...@@ -62,6 +62,8 @@ public class ServerCommunicationSystem extends Thread { ...@@ -62,6 +62,8 @@ public class ServerCommunicationSystem extends Thread {
public ServerCommunicationSystem(TOMConfiguration conf) throws Exception { public ServerCommunicationSystem(TOMConfiguration conf) throws Exception {
super("Server CS"); super("Server CS");
inQueue = new LinkedBlockingQueue<SystemMessage>(conf.inQueueSize());
//create a new conf, with updated port number for servers //create a new conf, with updated port number for servers
TOMConfiguration serversConf = new TOMConfiguration(conf.getProcessId(), TOMConfiguration serversConf = new TOMConfiguration(conf.getProcessId(),
Configuration.getHomeDir(), "hosts.config"); Configuration.getHomeDir(), "hosts.config");
......
...@@ -207,32 +207,32 @@ public final class ExecutionManager { ...@@ -207,32 +207,32 @@ public final class ExecutionManager {
int msgType = msg.getPaxosType(); int msgType = msg.getPaxosType();
boolean isRetrievingState = tomLayer.isRetrievingState(); boolean isRetrievingState = tomLayer.isRetrievingState();
System.out.println("Esta a obter estado? " + isRetrievingState); String type = null;
System.out.print("Recebi uma mensagem do eid " + consId + " do tipo ");
switch (msgType) { switch (msgType) {
case MessageFactory.PROPOSE: case MessageFactory.PROPOSE:
System.out.println("PROPOSE"); type = "PROPOSE";
break; break;
case MessageFactory.WEAK: case MessageFactory.WEAK:
System.out.println("WEAK"); type = "WEAK";
break; break;
case MessageFactory.STRONG: case MessageFactory.STRONG:
System.out.println("STRONG"); type = "STRONG";
break; break;
case MessageFactory.DECIDE: case MessageFactory.DECIDE:
System.out.println("DECIDE"); type = "DECIDE";
break; break;
case MessageFactory.FREEZE: case MessageFactory.FREEZE:
System.out.println("FREEZE"); type = "FREEZE";
break; break;
case MessageFactory.COLLECT: case MessageFactory.COLLECT:
System.out.println("COLLECT"); type = "COLLECT";
break; break;
default: default:
System.out.println(); type = "";
break; break;
} }
System.out.println("Esta a obter estado? " + isRetrievingState);
Logger.println("Recebi uma mensagem do eid " + consId + " do tipo " + type);
boolean canProcessTheMessage = false; boolean canProcessTheMessage = false;
......
...@@ -102,7 +102,7 @@ public class LeaderModule { ...@@ -102,7 +102,7 @@ public class LeaderModule {
* @return The replica ID of the leader * @return The replica ID of the leader
*/ */
public int getLeader(int c, int r) { public int getLeader(int c, int r) {
/*** /***/
List<ConsInfo> list = leaderInfos.get(c); List<ConsInfo> list = leaderInfos.get(c);
if (list == null) { if (list == null) {
//there are no information for the execution c //there are no information for the execution c
...@@ -128,7 +128,7 @@ public class LeaderModule { ...@@ -128,7 +128,7 @@ public class LeaderModule {
} }
return -1; return -1;
/***/ /***/
return 0; //return 0;
} }
/** /**
...@@ -198,21 +198,9 @@ public class LeaderModule { ...@@ -198,21 +198,9 @@ public class LeaderModule {
} }
} }
for (int c = cStart; c < cEnd; c++) { for (int c = cStart; c <= cEnd; c++) {
//List list = leaderInfos.get(c + 1);
if (list == null) {//nunca vai acontecer isso!!!
//System.err.println("- Executing a code that wasn't supposed to be executed :-)");
//System.err.println("- And we have some reports there is a bug here!");
list = new LinkedList();
leaderInfos.put(c + 1, list);
List rm = leaderInfos.remove(c);
ConsInfo ci = (ConsInfo) rm.get(rm.size() - 1);
list.add(new ConsInfo(0, ci.leaderId));
} else {
leaderInfos.remove(c); leaderInfos.remove(c);
}
} }
......
...@@ -98,6 +98,8 @@ public class DeliveryThread extends Thread { ...@@ -98,6 +98,8 @@ public class DeliveryThread extends Thread {
receiver.setState(state.getState()); receiver.setState(state.getState());
tomLayer.lm.addLeaderInfo(state.getLastCheckpointEid(), state.getLastCheckpointRound(), state.getLastCheckpointLeader());
int lastCheckpointEid = state.getLastCheckpointEid(); int lastCheckpointEid = state.getLastCheckpointEid();
int lastEid = state.getLastEid(); int lastEid = state.getLastEid();
...@@ -107,6 +109,8 @@ public class DeliveryThread extends Thread { ...@@ -107,6 +109,8 @@ public class DeliveryThread extends Thread {
byte[] batch = state.getMessageBatch(eid).batch; // take a batch byte[] batch = state.getMessageBatch(eid).batch; // take a batch
tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round, state.getMessageBatch(eid).leader);
// obtain an array of requests from the taken consensus // obtain an array of requests from the taken consensus
BatchReader batchReader = new BatchReader(batch, conf.getUseSignatures()==1); BatchReader batchReader = new BatchReader(batch, conf.getUseSignatures()==1);
...@@ -327,7 +331,7 @@ public class DeliveryThread extends Thread { ...@@ -327,7 +331,7 @@ public class DeliveryThread extends Thread {
//define the last stable consensus... the stable consensus can //define the last stable consensus... the stable consensus can
//be removed from the leaderManager and the executionManager //be removed from the leaderManager and the executionManager
/** /**/
if (cons.getId() > 2) { if (cons.getId() > 2) {
int stableConsensus = cons.getId() - 3; int stableConsensus = cons.getId() - 3;
......
...@@ -92,7 +92,7 @@ public class RandomServer extends ServiceReplica { ...@@ -92,7 +92,7 @@ public class RandomServer extends ServiceReplica {
System.exit(-1); System.exit(-1);
} }
new CounterServer(Integer.parseInt(args[0])); new RandomServer(Integer.parseInt(args[0]));
} }
@Override @Override
......
...@@ -32,6 +32,7 @@ public class TOMConfiguration extends Configuration { ...@@ -32,6 +32,7 @@ public class TOMConfiguration extends Configuration {
protected int replyVerificationTime; protected int replyVerificationTime;
protected int maxBatchSize; protected int maxBatchSize;
protected int numberOfNonces; protected int numberOfNonces;
protected int inQueueSize;
protected boolean decideMessagesEnabled; protected boolean decideMessagesEnabled;
protected boolean verifyTimestamps; protected boolean verifyTimestamps;
protected boolean useSenderThread; protected boolean useSenderThread;
...@@ -69,6 +70,7 @@ public class TOMConfiguration extends Configuration { ...@@ -69,6 +70,7 @@ public class TOMConfiguration extends Configuration {
this.useSignatures = conf.useSignatures; this.useSignatures = conf.useSignatures;
this.checkpoint_period = conf.checkpoint_period; this.checkpoint_period = conf.checkpoint_period;
this.useControlFlow = conf.useControlFlow; this.useControlFlow = conf.useControlFlow;
this.inQueueSize = inQueueSize;
} }
/** Creates a new instance of TOMConfiguration */ /** Creates a new instance of TOMConfiguration */
...@@ -239,6 +241,13 @@ public class TOMConfiguration extends Configuration { ...@@ -239,6 +241,13 @@ public class TOMConfiguration extends Configuration {
useControlFlow = Integer.parseInt(s); useControlFlow = Integer.parseInt(s);
} }
s = (String) configs.remove("system.communication.inQueueSize");
if (s == null) {
inQueueSize = 200;
} else {
inQueueSize = Integer.parseInt(s);
}
rsaLoader = new RSAKeyLoader(this, TOMConfiguration.configHome); rsaLoader = new RSAKeyLoader(this, TOMConfiguration.configHome);
} catch (Exception e) { } catch (Exception e) {
...@@ -292,6 +301,11 @@ public class TOMConfiguration extends Configuration { ...@@ -292,6 +301,11 @@ public class TOMConfiguration extends Configuration {
return verifyTimestamps; return verifyTimestamps;
} }
public int inQueueSize() {
return inQueueSize;
}
public boolean isUseSenderThread() { public boolean isUseSenderThread() {
return useSenderThread; return useSenderThread;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册