From 792ce8891d83a4ad8b5b93caaa70f05c6c462cce Mon Sep 17 00:00:00 2001 From: "liquidsnake@sapo.pt" Date: Sat, 7 Apr 2012 17:28:35 +0000 Subject: [PATCH] Copied bug fixes from branch to main trunk, part 2 --- .../executionmanager/ExecutionManager.java | 7 +- .../smart/paxosatwar/roles/Acceptor.java | 5 +- .../smart/statemanagment/StateManager.java | 103 +++++++----------- src/navigators/smart/tom/core/TOMLayer.java | 46 ++++++-- 4 files changed, 82 insertions(+), 79 deletions(-) diff --git a/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java b/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java index 00ae72f1..ab709295 100644 --- a/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java +++ b/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java @@ -147,7 +147,7 @@ public final class ExecutionManager { if (tomLayer.getInExec() != -1) { stoppedRound = getExecution(tomLayer.getInExec()).getLastRound(); //stoppedRound.getTimeoutTask().cancel(); - Logger.println("(ExecutionManager.stop) Stoping round " + stoppedRound.getNumber() + " of consensus " + stoppedRound.getExecution().getId()); + if (stoppedRound != null) Logger.println("(ExecutionManager.stop) Stoping round " + stoppedRound.getNumber() + " of consensus " + tomLayer.getInExec()); } stoppedMsgsLock.unlock(); } @@ -412,6 +412,7 @@ public final class ExecutionManager { outOfContextLock.lock(); /******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/ if (m.getPaxosType() == MessageFactory.PROPOSE) { + Logger.println("(ExecutionManager.addOutOfContextMessage) adding " + m); outOfContextProposes.put(m.getNumber(), m); } else { List messages = outOfContext.get(m.getNumber()); @@ -419,11 +420,9 @@ public final class ExecutionManager { messages = new LinkedList(); outOfContext.put(m.getNumber(), messages); } + Logger.println("(ExecutionManager.addOutOfContextMessage) adding " + m); messages.add(m); - if (outOfContext.size() % 1000 == 0) { - Logger.println("(ExecutionManager.addOutOfContextMessage) out-of-context size: " + outOfContext.size()); - } } /******* END OUTOFCONTEXT CRITICAL SECTION *******/ diff --git a/src/navigators/smart/paxosatwar/roles/Acceptor.java b/src/navigators/smart/paxosatwar/roles/Acceptor.java index 494b8917..0cf0b12e 100644 --- a/src/navigators/smart/paxosatwar/roles/Acceptor.java +++ b/src/navigators/smart/paxosatwar/roles/Acceptor.java @@ -235,7 +235,10 @@ public final class Acceptor { round.setStrong(me, value); - round.getExecution().getLearner().firstMessageProposed.strongSentTime = System.nanoTime(); + if(round.getExecution().getLearner().firstMessageProposed==null) { + + round.getExecution().getLearner().firstMessageProposed.strongSentTime = System.nanoTime(); + } communication.send(this.reconfManager.getCurrentViewOtherAcceptors(), factory.createStrong(eid, round.getNumber(), value)); diff --git a/src/navigators/smart/statemanagment/StateManager.java b/src/navigators/smart/statemanagment/StateManager.java index 86fed641..1068f402 100644 --- a/src/navigators/smart/statemanagment/StateManager.java +++ b/src/navigators/smart/statemanagment/StateManager.java @@ -74,6 +74,8 @@ public class StateManager { private LCManager lcManager; private ExecutionManager execManager; + private boolean appStateOnly; + public StateManager(ServerViewManager manager, TOMLayer tomLayer, DeliveryThread dt, LCManager lcManager, ExecutionManager execManager) { //******* EDUARDO BEGIN **************// @@ -98,6 +100,8 @@ public class StateManager { this.state = null; this.lastEid = -1; this.waitingEid = -1; + + appStateOnly = false; } public int getReplica() { @@ -319,6 +323,14 @@ public class StateManager { return senderStates.size(); } + public void requestAppState(int eid) { + setLastEID(eid + 1); + setWaiting(eid); + appStateOnly = true; + + requestState(); + } + public void analyzeState(int sender, int eid) { Logger.println("(TOMLayer.analyzeState) The state transfer protocol is enabled"); @@ -436,56 +448,6 @@ public class StateManager { } } - - public void temp(byte[] state) { - try { - - // serialize to byte array and return - ByteArrayInputStream bis = new ByteArrayInputStream(state); - ObjectInput in = new ObjectInputStream(bis); - BFTTableMap tableMap = (BFTTableMap) in.readObject(); - in.close(); - bis.close(); - showTables(tableMap); - - } catch (ClassNotFoundException ex) { - ex.printStackTrace(); - } catch (IOException ex) { - ex.printStackTrace(); - } - } - - public void showTables(BFTTableMap tableMap) { - try { - Map> tables = tableMap.getTables(); - Collection tableNames = tables.keySet(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(10000); - DataOutputStream dos = new DataOutputStream(baos); - for(String tableName : tableNames) { - System.out.println("[StateManager][showTables] Table name: " + tableName); - dos.writeUTF(tableName); - Map tableTmp = tables.get(tableName); - dos.writeInt(tableTmp.size()); - for(String key : tableTmp.keySet()) { - dos.writeUTF(key); - dos.flush(); - byte[] value = tableTmp.get(key); - dos.writeInt(value.length); - dos.write(value); - dos.flush(); - System.out.println("[StateManager][showTables] ---- Size of key '" + key + "': " + value.length); - } - System.out.println("[StateManager][showTables] ---- Count of rows for table '" + tableName + "': " + tableTmp.size()); - dos.flush(); - } - byte[] state = baos.toByteArray(); - System.out.println("[StateManager][showTables] Current byte array size: " + state.length); - //return state; - } catch (IOException ex) { - ex.printStackTrace(); - //return new byte[0]; - } - } public void SMReplyDeliver(SMMessage msg) { @@ -503,24 +465,25 @@ public class StateManager { int currentRegency = -1; int currentLeader = -1; View currentView = null; - addRegency(msg.getSender(), msg.getRegency()); - addLeader(msg.getSender(), msg.getLeader()); - addView(msg.getSender(), msg.getView()); - if (moreThan2F_Regencies(msg.getRegency())) currentRegency = msg.getRegency(); - if (moreThan2F_Leaders(msg.getLeader())) currentLeader = msg.getLeader(); - if (moreThan2F_Views(msg.getView())) { - currentView = msg.getView(); - if (currentView.isMember(SVManager.getStaticConf().getProcessId())) { - System.out.println("Not a member!"); + + if (!appStateOnly) { + addRegency(msg.getSender(), msg.getRegency()); + addLeader(msg.getSender(), msg.getLeader()); + addView(msg.getSender(), msg.getView()); + if (moreThan2F_Regencies(msg.getRegency())) currentRegency = msg.getRegency(); + if (moreThan2F_Leaders(msg.getLeader())) currentLeader = msg.getLeader(); + if (moreThan2F_Views(msg.getView())) { + currentView = msg.getView(); + if (!currentView.isMember(SVManager.getStaticConf().getProcessId())) { + System.out.println("Not a member!"); + } } + } else { + currentLeader = tomLayer.lm.getCurrentLeader(); + currentRegency = lcManager.getLastReg(); + currentView = SVManager.getCurrentView(); } - /*if (msg.getState().hasState()) { - System.out.println("(TOMLayer.SMReplyDeliver) Snapshot da replica " + msg.getSender() + " para o estado " + ((DefaultApplicationState) msg.getState()).getLastCheckpointEid()); - temp(msg.getState().getSerializedState()); - - }*/ - System.out.println("(TOMLayer.SMReplyDeliver) The reply is for the EID that I want!"); if (msg.getSender() == getReplica() && msg.getState().getSerializedState() != null) { @@ -583,7 +546,7 @@ public class StateManager { dt.update(recvState); //Deal with stopped messages that may come from synchronization phase - if (execManager.stopped()) { + if (!appStateOnly && execManager.stopped()) { Queue stoppedMsgs = execManager.getStoppedMsgs(); @@ -622,6 +585,10 @@ public class StateManager { tomLayer.requestsTimer.Enabled(true); tomLayer.requestsTimer.startTimer(); + if (appStateOnly) { + appStateOnly = false; + tomLayer.resumeLC(); + } //******* EDUARDO BEGIN **************// } else if (recvState == null && (SVManager.getCurrentViewN() / 2) < getReplies()) { //******* EDUARDO END **************// @@ -639,6 +606,10 @@ public class StateManager { //requestState(); if (stateTimer != null) stateTimer.cancel(); + + if (appStateOnly) { + requestState(); + } } else if (haveState == -1) { System.out.println("(TOMLayer.SMReplyDeliver) The replica from which I expected the state, sent one which doesn't match the hash of the others, or it never sent it at all"); diff --git a/src/navigators/smart/tom/core/TOMLayer.java b/src/navigators/smart/tom/core/TOMLayer.java index 7c5a6ef2..5bd6a0f5 100644 --- a/src/navigators/smart/tom/core/TOMLayer.java +++ b/src/navigators/smart/tom/core/TOMLayer.java @@ -1014,7 +1014,21 @@ public final class TOMLayer extends Thread implements RequestReceiver { } } } - + // temporary info to resume LC protocol + private int tempRegency = -1; + private LastEidData tempLastHighestEid = null; + private int tempCurrentEid = -1; + private HashSet tempSignedCollects = null; + private byte[] tempPropose = null; + private int tempBatchSize = -1; + private boolean tempIAmLeader = false; + + public void resumeLC() { + + finalise(tempRegency, tempLastHighestEid, tempCurrentEid, + tempSignedCollects, tempPropose, tempBatchSize, tempIAmLeader); + + } // this method is called on all replicas, and serves to verify and apply the // information sent in the catch-up message private void finalise(int regency, LastEidData lastHighestEid, @@ -1030,7 +1044,20 @@ public final class TOMLayer extends Thread implements RequestReceiver { //TODO: Case in which it is necessary to apply state transfer System.out.println("NEEDING TO USE STATE TRANSFER!! (" + lastHighestEid.getEid() + ")"); - + + tempRegency = regency; + tempLastHighestEid = lastHighestEid; + tempCurrentEid = currentEid; + tempSignedCollects = signedCollects; + tempPropose = propose; + tempBatchSize = batchSize; + tempIAmLeader = iAmLeader; + + execManager.getStoppedMsgs().add(acceptor.getFactory().createPropose(currentEid, 0, propose, null)); + stateManager.requestAppState(lastHighestEid.getEid()); + + return; + } else if (getLastExec() + 1 == lastHighestEid.getEid()) { // Is this replica still executing the last decided consensus? @@ -1048,10 +1075,10 @@ public final class TOMLayer extends Thread implements RequestReceiver { r.clear(); } - byte[] hash = computeHash(propose); + byte[] hash = computeHash(lastHighestEid.getEidDecision()); r.propValueHash = hash; - r.propValue = propose; - r.deserializedPropValue = checkProposedValue(propose); + r.propValue = lastHighestEid.getEidDecision(); + r.deserializedPropValue = checkProposedValue(lastHighestEid.getEidDecision()); exec.decided(r, hash); // pass the decision to the delivery thread } byte[] tmpval = null; @@ -1093,9 +1120,12 @@ public final class TOMLayer extends Thread implements RequestReceiver { r.propValue = tmpval; r.deserializedPropValue = checkProposedValue(tmpval); - if(exec.getLearner().firstMessageProposed == null) - exec.getLearner().firstMessageProposed = r.deserializedPropValue[0]; - + if(exec.getLearner().firstMessageProposed == null) { + if (r.deserializedPropValue != null && + r.deserializedPropValue.length > 0) + exec.getLearner().firstMessageProposed = r.deserializedPropValue[0]; + else exec.getLearner().firstMessageProposed = new TOMMessage(); // to avoid null pointer + } r.setWeak(me, hash); lm.setNewReg(regency); -- GitLab