From 238729c8c09047eb19006dd17c13b45e1b488e46 Mon Sep 17 00:00:00 2001 From: "liquidsnake@sapo.pt" Date: Tue, 13 Dec 2011 19:34:35 +0000 Subject: [PATCH] Fixed a bug on the state transfer protocol, that would render the leader change protocol from changing leader a second time. However, the leader change is still unstable when ciclying through replicas. I'm investigating the issue. This commit as a lot of debugging message related to the leader change protocol. --- .../smart/statemanagment/StateManager.java | 2 +- src/navigators/smart/tom/core/TOMLayer.java | 43 +++++++++++++++++-- .../smart/tom/core/timer/RequestsTimer.java | 31 ++++++++----- 3 files changed, 61 insertions(+), 15 deletions(-) diff --git a/src/navigators/smart/statemanagment/StateManager.java b/src/navigators/smart/statemanagment/StateManager.java index 260d7e31..33033c64 100644 --- a/src/navigators/smart/statemanagment/StateManager.java +++ b/src/navigators/smart/statemanagment/StateManager.java @@ -448,7 +448,7 @@ public class StateManager { if (recvState != null && haveState == 1 && currentRegency > -1) { lcManager.setLastReg(currentRegency); - lcManager.setLastReg(currentRegency); + lcManager.setNextReg(currentRegency); tomLayer.lm.setNewReg(currentRegency); Logger.println("(TOMLayer.SMReplyDeliver) The state of those replies is good!"); diff --git a/src/navigators/smart/tom/core/TOMLayer.java b/src/navigators/smart/tom/core/TOMLayer.java index 5a630d07..8cef4837 100644 --- a/src/navigators/smart/tom/core/TOMLayer.java +++ b/src/navigators/smart/tom/core/TOMLayer.java @@ -483,8 +483,11 @@ public final class TOMLayer extends Thread implements RequestReceiver { lcManager.lastregLock(); // ainda nao estou na fase de troca de lider? + + System.out.println("TIMEOUT!!! " + lcManager.getNextReg() + " == " + lcManager.getLastReg()); if (lcManager.getNextReg() == lcManager.getLastReg()) { + System.out.println("Vou passar a troca de lider para regencia " + lcManager.getNextReg()); lcManager.setNextReg(lcManager.getLastReg() + 1); // definir proximo timestamp int regency = lcManager.getNextReg(); @@ -525,17 +528,21 @@ public final class TOMLayer extends Thread implements RequestReceiver { out.close(); bos.close(); + System.out.println("(1) MANDEI STOP para regencia " + regency); + // enviar mensagem STOP communication.send(this.reconfManager.getCurrentViewOtherAcceptors(), new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOP, regency, payload)); } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } finally { try { out.close(); bos.close(); } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } } @@ -553,6 +560,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { // este metodo e invocado aquando de um timeout ou da recepcao de uma mensagem STOP private void evaluateStops(int nextReg) { + System.out.println("Metodo evaluateStops com regencia " + nextReg); + System.out.println("(1) Numero de STOPS: " + lcManager.getStopsSize(nextReg)); ObjectOutputStream out = null; ByteArrayOutputStream bos = null; @@ -560,9 +569,14 @@ public final class TOMLayer extends Thread implements RequestReceiver { lcManager.lastregLock(); lcManager.StopsLock(); + System.out.println("(1) " + (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorumF()) + " " + (lcManager.getNextReg() == lcManager.getLastReg())); // passar para a fase de troca de lider se já tiver recebido mais de f mensagens if (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorumF() && lcManager.getNextReg() == lcManager.getLastReg()) { + System.out.println("STOPs suficientes para regencia " + nextReg); + + //requestsTimer.stopTimer(); + lcManager.setNextReg(lcManager.getLastReg() + 1); // definir proximo timestamp int regency = lcManager.getNextReg(); @@ -594,25 +608,32 @@ public final class TOMLayer extends Thread implements RequestReceiver { out.close(); bos.close(); + System.out.println("(2) MANDEI STOP para regencia " + regency); + // enviar mensagem STOP communication.send(this.reconfManager.getCurrentViewOtherAcceptors(), new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOP, regency, payload)); } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } finally { try { out.close(); bos.close(); } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } } } - + System.out.println("(2) Numero de STOPS: " + lcManager.getStopsSize(nextReg)); + + System.out.println("(2) " + (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorum2F()) + " " + (lcManager.getNextReg() > lcManager.getLastReg())); // posso passar para a fase de sincronizacao? if (lcManager.getStopsSize(nextReg) > this.reconfManager.getQuorum2F() && lcManager.getNextReg() > lcManager.getLastReg()) { + System.out.println("Posso passar para a proxima fase de troca de lider para regencia " + nextReg); lcManager.setLastReg(lcManager.getNextReg()); // definir ultimo timestamp lcManager.nextregUnlock(); @@ -624,6 +645,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { lcManager.removeStops(nextReg); lcManager.StopsUnlock(); + + //requestsTimer.startTimer(); int leader = regency % this.reconfManager.getCurrentViewN(); // novo lider int in = getInExec(); // eid a executar @@ -691,6 +714,7 @@ public final class TOMLayer extends Thread implements RequestReceiver { int[] b = new int[1]; b[0] = leader; + System.out.println("MANDEI STOPDATA para regencia " + regency); // enviar mensagem SYNC para o novo lider communication.send(b, new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOPDATA, regency, payload)); @@ -698,12 +722,14 @@ public final class TOMLayer extends Thread implements RequestReceiver { //TODO: Voltar a ligar o timeout } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } finally { try { out.close(); bos.close(); } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } } @@ -764,10 +790,13 @@ public final class TOMLayer extends Thread implements RequestReceiver { case TOMUtil.STOP: // mensagens STOP { + System.out.println("Recebi STOP para regencia " + msg.getReg()); lcManager.lastregLock(); // esta mensagem e para a proxima mudanca de lider? if (msg.getReg() == lcManager.getLastReg() + 1) { + + System.out.println("Este stop e para a proxima regencia!"); lcManager.lastregUnlock(); @@ -794,8 +823,10 @@ public final class TOMLayer extends Thread implements RequestReceiver { bis.close(); } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } catch (ClassNotFoundException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } @@ -933,8 +964,10 @@ public final class TOMLayer extends Thread implements RequestReceiver { bis.close(); } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } catch (ClassNotFoundException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } @@ -995,6 +1028,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { out.close(); bos.close(); + System.out.println("MANDEI SYNC para regencia " + regency); + // enviar a mensagem CATCH-UP communication.send(this.reconfManager.getCurrentViewOtherAcceptors(), new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.SYNC, regency, payload)); @@ -1002,12 +1037,14 @@ public final class TOMLayer extends Thread implements RequestReceiver { finalise(regency, lastHighestEid, currentEid, signedCollects, propose, batchSize, true); } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } finally { try { out.close(); bos.close(); } catch (IOException ex) { + ex.printStackTrace(); java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex); } } @@ -1098,8 +1135,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { } // acordar a thread que propoem valores na operacao normal - System.out.println(regency + " // WEAK: " + new BigInteger(r.propValueHash)); - // enviar mensagens WEAK para as outras replicas + System.out.println(regency + " // WEAK: " + new String(r.propValueHash)); + // enviar mensagens WEAK para as outras replicas communication.send(this.reconfManager.getCurrentViewOtherAcceptors(), acceptor.getFactory().createWeak(currentEid, r.getNumber(), r.propValueHash)); diff --git a/src/navigators/smart/tom/core/timer/RequestsTimer.java b/src/navigators/smart/tom/core/timer/RequestsTimer.java index 79e12bcc..22df6185 100644 --- a/src/navigators/smart/tom/core/timer/RequestsTimer.java +++ b/src/navigators/smart/tom/core/timer/RequestsTimer.java @@ -66,6 +66,19 @@ public class RequestsTimer { this.timeout = this.reconfManager.getStaticConf().getRequestTimeout(); } + public void startTimer() { + if (rtTask == null) { + rtTask = new RequestTimerTask(); + timer.schedule(rtTask, timeout); + } + } + + public void stopTimer() { + if (rtTask != null) { + rtTask.cancel(); + rtTask = null; + } + } /** * Creates a timer for the given request * @param request Request to which the timer is being createf for @@ -74,10 +87,7 @@ public class RequestsTimer { //long startInstant = System.nanoTime(); rwLock.writeLock().lock(); watched.add(request); - if (watched.size() >= 1 && rtTask == null) { - rtTask = new RequestTimerTask(); - timer.schedule(rtTask, timeout); - } + if (watched.size() >= 1) startTimer(); rwLock.writeLock().unlock(); /* st1.store(System.nanoTime() - startInstant); @@ -96,10 +106,7 @@ public class RequestsTimer { public void unwatch(TOMMessage request) { //long startInstant = System.nanoTime(); rwLock.writeLock().lock(); - if (watched.remove(request) && watched.isEmpty() && rtTask != null) { - rtTask.cancel(); - rtTask = null; - } + if (watched.remove(request) && watched.isEmpty()) stopTimer(); rwLock.writeLock().unlock(); /* st2.store(System.nanoTime() - startInstant); @@ -159,11 +166,13 @@ public class RequestsTimer { if (!pendingRequests.isEmpty()) { System.out.println("Timeout for messages: " + pendingRequests); //tomLayer.requestTimeout(pendingRequests); + //stopTimer(); tomLayer.triggerTimeout(pendingRequests); } - - rtTask = new RequestTimerTask(); - timer.schedule(rtTask, timeout); + else { + rtTask = new RequestTimerTask(); + timer.schedule(rtTask, timeout); + } } else { rtTask = null; timer.purge(); -- GitLab