提交 238729c8 编写于 作者: L liquidsnake@sapo.pt

Fixed a bug on the state transfer protocol, that would render the leader...

Fixed a bug on the state transfer protocol, that would render the leader change protocol from changing leader a second time. However, the leader change is still unstable when ciclying through replicas. I'm investigating the issue.

This commit as a lot of debugging message related to the leader change protocol.
上级 9961ae8c
...@@ -448,7 +448,7 @@ public class StateManager { ...@@ -448,7 +448,7 @@ public class StateManager {
if (recvState != null && haveState == 1 && currentRegency > -1) { if (recvState != null && haveState == 1 && currentRegency > -1) {
lcManager.setLastReg(currentRegency); lcManager.setLastReg(currentRegency);
lcManager.setLastReg(currentRegency); lcManager.setNextReg(currentRegency);
tomLayer.lm.setNewReg(currentRegency); tomLayer.lm.setNewReg(currentRegency);
Logger.println("(TOMLayer.SMReplyDeliver) The state of those replies is good!"); Logger.println("(TOMLayer.SMReplyDeliver) The state of those replies is good!");
......
...@@ -483,8 +483,11 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -483,8 +483,11 @@ public final class TOMLayer extends Thread implements RequestReceiver {
lcManager.lastregLock(); lcManager.lastregLock();
// ainda nao estou na fase de troca de lider? // ainda nao estou na fase de troca de lider?
System.out.println("TIMEOUT!!! " + lcManager.getNextReg() + " == " + lcManager.getLastReg());
if (lcManager.getNextReg() == lcManager.getLastReg()) { if (lcManager.getNextReg() == lcManager.getLastReg()) {
System.out.println("Vou passar a troca de lider para regencia " + lcManager.getNextReg());
lcManager.setNextReg(lcManager.getLastReg() + 1); // definir proximo timestamp lcManager.setNextReg(lcManager.getLastReg() + 1); // definir proximo timestamp
int regency = lcManager.getNextReg(); int regency = lcManager.getNextReg();
...@@ -525,17 +528,21 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -525,17 +528,21 @@ public final class TOMLayer extends Thread implements RequestReceiver {
out.close(); out.close();
bos.close(); bos.close();
System.out.println("(1) MANDEI STOP para regencia " + regency);
// enviar mensagem STOP // enviar mensagem STOP
communication.send(this.reconfManager.getCurrentViewOtherAcceptors(), communication.send(this.reconfManager.getCurrentViewOtherAcceptors(),
new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOP, regency, payload)); new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOP, regency, payload));
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} finally { } finally {
try { try {
out.close(); out.close();
bos.close(); bos.close();
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} }
} }
...@@ -553,6 +560,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -553,6 +560,8 @@ public final class TOMLayer extends Thread implements RequestReceiver {
// este metodo e invocado aquando de um timeout ou da recepcao de uma mensagem STOP // este metodo e invocado aquando de um timeout ou da recepcao de uma mensagem STOP
private void evaluateStops(int nextReg) { private void evaluateStops(int nextReg) {
System.out.println("Metodo evaluateStops com regencia " + nextReg);
System.out.println("(1) Numero de STOPS: " + lcManager.getStopsSize(nextReg));
ObjectOutputStream out = null; ObjectOutputStream out = null;
ByteArrayOutputStream bos = null; ByteArrayOutputStream bos = null;
...@@ -560,9 +569,14 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -560,9 +569,14 @@ public final class TOMLayer extends Thread implements RequestReceiver {
lcManager.lastregLock(); lcManager.lastregLock();
lcManager.StopsLock(); lcManager.StopsLock();
System.out.println("(1) " + (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorumF()) + " " + (lcManager.getNextReg() == lcManager.getLastReg()));
// passar para a fase de troca de lider se já tiver recebido mais de f mensagens // passar para a fase de troca de lider se já tiver recebido mais de f mensagens
if (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorumF() && lcManager.getNextReg() == lcManager.getLastReg()) { if (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorumF() && lcManager.getNextReg() == lcManager.getLastReg()) {
System.out.println("STOPs suficientes para regencia " + nextReg);
//requestsTimer.stopTimer();
lcManager.setNextReg(lcManager.getLastReg() + 1); // definir proximo timestamp lcManager.setNextReg(lcManager.getLastReg() + 1); // definir proximo timestamp
int regency = lcManager.getNextReg(); int regency = lcManager.getNextReg();
...@@ -594,25 +608,32 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -594,25 +608,32 @@ public final class TOMLayer extends Thread implements RequestReceiver {
out.close(); out.close();
bos.close(); bos.close();
System.out.println("(2) MANDEI STOP para regencia " + regency);
// enviar mensagem STOP // enviar mensagem STOP
communication.send(this.reconfManager.getCurrentViewOtherAcceptors(), communication.send(this.reconfManager.getCurrentViewOtherAcceptors(),
new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOP, regency, payload)); new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOP, regency, payload));
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} finally { } finally {
try { try {
out.close(); out.close();
bos.close(); bos.close();
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} }
} }
} }
System.out.println("(2) Numero de STOPS: " + lcManager.getStopsSize(nextReg));
System.out.println("(2) " + (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorum2F()) + " " + (lcManager.getNextReg() > lcManager.getLastReg()));
// posso passar para a fase de sincronizacao? // posso passar para a fase de sincronizacao?
if (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorum2F() && lcManager.getNextReg() > lcManager.getLastReg()) { if (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorum2F() && lcManager.getNextReg() > lcManager.getLastReg()) {
System.out.println("Posso passar para a proxima fase de troca de lider para regencia " + nextReg);
lcManager.setLastReg(lcManager.getNextReg()); // definir ultimo timestamp lcManager.setLastReg(lcManager.getNextReg()); // definir ultimo timestamp
lcManager.nextregUnlock(); lcManager.nextregUnlock();
...@@ -624,6 +645,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -624,6 +645,8 @@ public final class TOMLayer extends Thread implements RequestReceiver {
lcManager.removeStops(nextReg); lcManager.removeStops(nextReg);
lcManager.StopsUnlock(); lcManager.StopsUnlock();
//requestsTimer.startTimer();
int leader = regency % this.reconfManager.getCurrentViewN(); // novo lider int leader = regency % this.reconfManager.getCurrentViewN(); // novo lider
int in = getInExec(); // eid a executar int in = getInExec(); // eid a executar
...@@ -691,6 +714,7 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -691,6 +714,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
int[] b = new int[1]; int[] b = new int[1];
b[0] = leader; b[0] = leader;
System.out.println("MANDEI STOPDATA para regencia " + regency);
// enviar mensagem SYNC para o novo lider // enviar mensagem SYNC para o novo lider
communication.send(b, communication.send(b,
new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOPDATA, regency, payload)); new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOPDATA, regency, payload));
...@@ -698,12 +722,14 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -698,12 +722,14 @@ public final class TOMLayer extends Thread implements RequestReceiver {
//TODO: Voltar a ligar o timeout //TODO: Voltar a ligar o timeout
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} finally { } finally {
try { try {
out.close(); out.close();
bos.close(); bos.close();
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} }
} }
...@@ -764,10 +790,13 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -764,10 +790,13 @@ public final class TOMLayer extends Thread implements RequestReceiver {
case TOMUtil.STOP: // mensagens STOP case TOMUtil.STOP: // mensagens STOP
{ {
System.out.println("Recebi STOP para regencia " + msg.getReg());
lcManager.lastregLock(); lcManager.lastregLock();
// esta mensagem e para a proxima mudanca de lider? // esta mensagem e para a proxima mudanca de lider?
if (msg.getReg() == lcManager.getLastReg() + 1) { if (msg.getReg() == lcManager.getLastReg() + 1) {
System.out.println("Este stop e para a proxima regencia!");
lcManager.lastregUnlock(); lcManager.lastregUnlock();
...@@ -794,8 +823,10 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -794,8 +823,10 @@ public final class TOMLayer extends Thread implements RequestReceiver {
bis.close(); bis.close();
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} catch (ClassNotFoundException ex) { } catch (ClassNotFoundException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} }
...@@ -933,8 +964,10 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -933,8 +964,10 @@ public final class TOMLayer extends Thread implements RequestReceiver {
bis.close(); bis.close();
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} catch (ClassNotFoundException ex) { } catch (ClassNotFoundException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} }
...@@ -995,6 +1028,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -995,6 +1028,8 @@ public final class TOMLayer extends Thread implements RequestReceiver {
out.close(); out.close();
bos.close(); bos.close();
System.out.println("MANDEI SYNC para regencia " + regency);
// enviar a mensagem CATCH-UP // enviar a mensagem CATCH-UP
communication.send(this.reconfManager.getCurrentViewOtherAcceptors(), communication.send(this.reconfManager.getCurrentViewOtherAcceptors(),
new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.SYNC, regency, payload)); new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.SYNC, regency, payload));
...@@ -1002,12 +1037,14 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -1002,12 +1037,14 @@ public final class TOMLayer extends Thread implements RequestReceiver {
finalise(regency, lastHighestEid, currentEid, signedCollects, propose, batchSize, true); finalise(regency, lastHighestEid, currentEid, signedCollects, propose, batchSize, true);
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} finally { } finally {
try { try {
out.close(); out.close();
bos.close(); bos.close();
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace();
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
} }
} }
...@@ -1098,8 +1135,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { ...@@ -1098,8 +1135,8 @@ public final class TOMLayer extends Thread implements RequestReceiver {
} // acordar a thread que propoem valores na operacao normal } // acordar a thread que propoem valores na operacao normal
System.out.println(regency + " // WEAK: " + new BigInteger(r.propValueHash)); System.out.println(regency + " // WEAK: " + new String(r.propValueHash));
// enviar mensagens WEAK para as outras replicas // enviar mensagens WEAK para as outras replicas
communication.send(this.reconfManager.getCurrentViewOtherAcceptors(), communication.send(this.reconfManager.getCurrentViewOtherAcceptors(),
acceptor.getFactory().createWeak(currentEid, r.getNumber(), r.propValueHash)); acceptor.getFactory().createWeak(currentEid, r.getNumber(), r.propValueHash));
......
...@@ -66,6 +66,19 @@ public class RequestsTimer { ...@@ -66,6 +66,19 @@ public class RequestsTimer {
this.timeout = this.reconfManager.getStaticConf().getRequestTimeout(); this.timeout = this.reconfManager.getStaticConf().getRequestTimeout();
} }
public void startTimer() {
if (rtTask == null) {
rtTask = new RequestTimerTask();
timer.schedule(rtTask, timeout);
}
}
public void stopTimer() {
if (rtTask != null) {
rtTask.cancel();
rtTask = null;
}
}
/** /**
* Creates a timer for the given request * Creates a timer for the given request
* @param request Request to which the timer is being createf for * @param request Request to which the timer is being createf for
...@@ -74,10 +87,7 @@ public class RequestsTimer { ...@@ -74,10 +87,7 @@ public class RequestsTimer {
//long startInstant = System.nanoTime(); //long startInstant = System.nanoTime();
rwLock.writeLock().lock(); rwLock.writeLock().lock();
watched.add(request); watched.add(request);
if (watched.size() >= 1 && rtTask == null) { if (watched.size() >= 1) startTimer();
rtTask = new RequestTimerTask();
timer.schedule(rtTask, timeout);
}
rwLock.writeLock().unlock(); rwLock.writeLock().unlock();
/* /*
st1.store(System.nanoTime() - startInstant); st1.store(System.nanoTime() - startInstant);
...@@ -96,10 +106,7 @@ public class RequestsTimer { ...@@ -96,10 +106,7 @@ public class RequestsTimer {
public void unwatch(TOMMessage request) { public void unwatch(TOMMessage request) {
//long startInstant = System.nanoTime(); //long startInstant = System.nanoTime();
rwLock.writeLock().lock(); rwLock.writeLock().lock();
if (watched.remove(request) && watched.isEmpty() && rtTask != null) { if (watched.remove(request) && watched.isEmpty()) stopTimer();
rtTask.cancel();
rtTask = null;
}
rwLock.writeLock().unlock(); rwLock.writeLock().unlock();
/* /*
st2.store(System.nanoTime() - startInstant); st2.store(System.nanoTime() - startInstant);
...@@ -159,11 +166,13 @@ public class RequestsTimer { ...@@ -159,11 +166,13 @@ public class RequestsTimer {
if (!pendingRequests.isEmpty()) { if (!pendingRequests.isEmpty()) {
System.out.println("Timeout for messages: " + pendingRequests); System.out.println("Timeout for messages: " + pendingRequests);
//tomLayer.requestTimeout(pendingRequests); //tomLayer.requestTimeout(pendingRequests);
//stopTimer();
tomLayer.triggerTimeout(pendingRequests); tomLayer.triggerTimeout(pendingRequests);
} }
else {
rtTask = new RequestTimerTask(); rtTask = new RequestTimerTask();
timer.schedule(rtTask, timeout); timer.schedule(rtTask, timeout);
}
} else { } else {
rtTask = null; rtTask = null;
timer.purge(); timer.purge();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册