提交 951602fc 编写于 作者: L liquidsnake@sapo.pt

In this commit, I continued to work on the state transfer protocol.I finally...

In this commit, I continued to work on the state transfer protocol.I finally tested the protocol in the case of a crash of a replica. I was able to make the replica recover the state on those circunstances.  However, the replica will still only requests the state after it passes the highmark value. I started working on an optimization to request the state before it reaches that highmark, but its still not completed.
上级 902f7370
......@@ -62,7 +62,7 @@ public class ServerCommunicationSystem extends Thread {
public ServerCommunicationSystem(TOMConfiguration conf) throws Exception {
super("Server CS");
inQueue = new LinkedBlockingQueue<SystemMessage>(conf.inQueueSize());
inQueue = new LinkedBlockingQueue<SystemMessage>(conf.getInQueueSize());
//create a new conf, with updated port number for servers
TOMConfiguration serversConf = new TOMConfiguration(conf.getProcessId(),
......
......@@ -75,6 +75,9 @@ public final class ExecutionManager {
private TOMLayer tomLayer; // TOM layer associated with this execution manager
private long initialTimeout; // initial timeout for rounds
private int paxosHighMark; // Paxos high mark for consensus instances
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
private int revivalHighMark; // Paxos high mark for consensus instances when this replica EID equals 0
/******************************************************************/
/**
* Creates a new instance of ExecutionManager
......@@ -106,6 +109,9 @@ public final class ExecutionManager {
public void setTOMLayer(TOMLayer tom) {
this.tomLayer = tom;
this.paxosHighMark = tom.getConf().getPaxosHighMark();
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
this.revivalHighMark = tom.getConf().getRevivalHighMark();
/******************************************************************/
}
/**
......@@ -204,6 +210,9 @@ public final class ExecutionManager {
outOfContextLock.lock();
int consId = msg.getNumber();
int lastConsId = tomLayer.getLastExec();
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
int currentConsId = tomLayer.getInExec();
/******************************************************************/
int msgType = msg.getPaxosType();
boolean isRetrievingState = tomLayer.isRetrievingState();
......@@ -244,7 +253,7 @@ public final class ExecutionManager {
// enquanto a replica esta a receber o estado das outras e a actualizar-se
isRetrievingState ||
//(currentConsId == 0 && consId > lastConsId && (consId < (lastConsId + revivalHighMark))) ||
/******************************************************************/
(consId > lastConsId && (consId < (lastConsId + paxosHighMark)))
......@@ -282,7 +291,14 @@ public final class ExecutionManager {
Logger.println("(ExecutionManager.checkLimits) message for execution "+consId+" can be processed");
canProcessTheMessage = true;
}
} else if (consId >= (lastConsId + paxosHighMark)) { // Does this message exceeds the high mark?
} else if (
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO *
(currentConsId == 0 && consId >= (lastConsId + revivalHighMark)) ||
/******************************************************************/
(consId >= (lastConsId + paxosHighMark))
) { // Does this message exceeds the high mark?
/**
System.out.println("##################################################################################");
......@@ -298,7 +314,7 @@ public final class ExecutionManager {
System.out.println("<Out of highmark>");
Logger.println("(ExecutionManager.checkLimits) adding message for execution "+consId+" to out of context");
addOutOfContextMessage(msg);
tomLayer.requestState(me, otherAcceptors, msg.getSender(), consId);
tomLayer.requestState(me, getOtherAcceptors(), msg.getSender(), consId);
/******************************************************************/
}
outOfContextLock.unlock();
......
......@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import navigators.smart.paxosatwar.Consensus;
......@@ -83,10 +84,12 @@ public class DeliveryThread extends Thread {
public void deliverLock() {
deliverLock.lock();
Logger.println("Obti o deliver lock");
}
public void deliverUnlock() {
deliverLock.unlock();
Logger.println("Soltei o deliver lock");
}
public void canDeliver() {
......@@ -95,7 +98,9 @@ public class DeliveryThread extends Thread {
public void update(TransferableState state) {
//deliverLock.lock();
System.out.println("Vou actualizar-me");
receiver.setState(state.getState());
tomLayer.lm.addLeaderInfo(state.getLastCheckpointEid(), state.getLastCheckpointRound(), state.getLastCheckpointLeader());
......@@ -247,7 +252,7 @@ public class DeliveryThread extends Thread {
while (true) {
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
deliverLock.lock();
deliverLock();
//if (tomLayer != null) {
while (tomLayer.isRetrievingState()) {
......@@ -257,7 +262,18 @@ public class DeliveryThread extends Thread {
/******************************************************************/
try {
Consensus cons = decided.take(); // take a decided consensus
//Consensus cons = decided.take(); // take a decided consensus
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
Logger.println("(DeliveryThread.run) Waiting for a consensus to be delivered.");
Consensus cons = decided.poll(1500, TimeUnit.MILLISECONDS); // take a decided consensus
if (cons == null) {
Logger.println("(DeliveryThread.run) Timeout while waiting for a consensus, starting over.");
deliverUnlock();
continue;
}
Logger.println("(DeliveryThread.run) A consensus was delivered.");
/******************************************************************/
startTime = System.currentTimeMillis();
//TODO: avoid the case in which the received valid proposal is
......@@ -392,7 +408,7 @@ public class DeliveryThread extends Thread {
e.printStackTrace(System.out);
}
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
deliverLock.unlock();
deliverUnlock();
/******************************************************************/
}
......
......@@ -1135,10 +1135,19 @@ public final class TOMLayer extends Thread implements RequestReceiver {
lockState.unlock();
System.out.println("Desbloqueei o lock para o log do estado");
dt.deliverLock();
System.out.println("Bloqueei o lock entre esta thread e a delivery thread");
ot.OutOfContextLock();
System.out.println("Bloqueei o lock entre esta thread e a out of context thread");
stateManager.setWaiting(-1);
System.out.println("Ja nao estou a espera de nenhum estado, e vou actualizar-me");
dt.update(state);
dt.canDeliver();
......
......@@ -24,6 +24,8 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import navigators.smart.tom.ServiceProxy;
......@@ -43,9 +45,15 @@ public class CounterClient {
ServiceProxy counterProxy = new ServiceProxy(Integer.parseInt(args[0]));
int i=0;
int inc = Integer.parseInt(args[1]);
//sends 1000 requests to replicas and then terminates
while(i<1000){
int inc = Integer.parseInt(args[1]);
while(i<50){
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Logger.getLogger(CounterClient.class.getName()).log(Level.SEVERE, null, ex);
}
ByteArrayOutputStream out = new ByteArrayOutputStream(4);
new DataOutputStream(out).writeInt(inc);
......
......@@ -29,6 +29,7 @@ public class TOMConfiguration extends Configuration {
protected int freezeInitialTimeout;
protected int tomPeriod;
protected int paxosHighMark;
protected int revivalHighMark;
protected int replyVerificationTime;
protected int maxBatchSize;
protected int numberOfNonces;
......@@ -56,6 +57,7 @@ public class TOMConfiguration extends Configuration {
this.freezeInitialTimeout = conf.freezeInitialTimeout;
this.tomPeriod = conf.tomPeriod;
this.paxosHighMark = conf.paxosHighMark;
this.revivalHighMark = conf.revivalHighMark;
this.replyVerificationTime = conf.replyVerificationTime;
this.maxBatchSize = conf.maxBatchSize;
this.numberOfNonces = conf.numberOfNonces;
......@@ -70,7 +72,7 @@ public class TOMConfiguration extends Configuration {
this.useSignatures = conf.useSignatures;
this.checkpoint_period = conf.checkpoint_period;
this.useControlFlow = conf.useControlFlow;
this.inQueueSize = inQueueSize;
this.inQueueSize = conf.inQueueSize;
}
/** Creates a new instance of TOMConfiguration */
......@@ -130,7 +132,7 @@ public class TOMConfiguration extends Configuration {
s = (String) configs.remove("system.totalordermulticast.highMark");
if (s == null) {
paxosHighMark = 10;
paxosHighMark = 10000;
} else {
paxosHighMark = Integer.parseInt(s);
if (paxosHighMark < 10) {
......@@ -138,6 +140,16 @@ public class TOMConfiguration extends Configuration {
}
}
s = (String) configs.remove("system.totalordermulticast.revival_highMark");
if (s == null) {
revivalHighMark = 10;
} else {
revivalHighMark = Integer.parseInt(s);
if (revivalHighMark < 1) {
revivalHighMark = 1;
}
}
s = (String) configs.remove("system.totalordermulticast.maxbatchsize");
if (s == null) {
maxBatchSize = 100;
......@@ -245,7 +257,12 @@ public class TOMConfiguration extends Configuration {
if (s == null) {
inQueueSize = 200;
} else {
inQueueSize = Integer.parseInt(s);
if (inQueueSize < 1) {
inQueueSize = 1;
}
}
rsaLoader = new RSAKeyLoader(this, TOMConfiguration.configHome);
......@@ -289,6 +306,10 @@ public class TOMConfiguration extends Configuration {
return paxosHighMark;
}
public int getRevivalHighMark() {
return revivalHighMark;
}
public int getMaxBatchSize() {
return maxBatchSize;
}
......@@ -301,7 +322,7 @@ public class TOMConfiguration extends Configuration {
return verifyTimestamps;
}
public int inQueueSize() {
public int getInQueueSize() {
return inQueueSize;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册