diff --git a/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java b/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java index 82912801a4a85b65351a37b18fb204ea385c9cea..00ae72f1f529faf357eca9a42c2f3d560fd474a0 100644 --- a/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java +++ b/src/navigators/smart/paxosatwar/executionmanager/ExecutionManager.java @@ -221,7 +221,7 @@ public final class ExecutionManager { Logger.println("(ExecutionManager.checkLimits) Message for execution " + msg.getNumber() + " is out of context, adding it to out of context set"); - + //System.out.println("(ExecutionManager.checkLimits) Message for execution " + // msg.getNumber() + " is out of context, adding it to out of context set; isRetrievingState="+isRetrievingState); @@ -230,7 +230,7 @@ public final class ExecutionManager { } else { //can process! Logger.println("(ExecutionManager.checkLimits) message for execution " + msg.getNumber() + " can be processed"); - + //Logger.debug = false; canProcessTheMessage = true; } diff --git a/src/navigators/smart/paxosatwar/roles/Acceptor.java b/src/navigators/smart/paxosatwar/roles/Acceptor.java index cbf86a09ef2e8629463db006b12908ca272646eb..494b89172c9669026b4a063fee01bd7ae6ea1f8a 100644 --- a/src/navigators/smart/paxosatwar/roles/Acceptor.java +++ b/src/navigators/smart/paxosatwar/roles/Acceptor.java @@ -93,8 +93,11 @@ public final class Acceptor { */ public final void deliver(PaxosMessage msg) { if (manager.checkLimits(msg)) { + Logger.println("processing paxos msg with id " + msg.getNumber()); + //Logger.debug = false; processMessage(msg); } else { + Logger.println("out of context msg with id " + msg.getNumber()); tomLayer.processOutOfContext(); } } diff --git a/src/navigators/smart/statemanagment/StateManager.java b/src/navigators/smart/statemanagment/StateManager.java index 50145b035f003fdd1edef3b7f762fbdcd25dfa8f..00513912606c21dfa60efebaaa63115c2c0beee4 100644 --- a/src/navigators/smart/statemanagment/StateManager.java +++ b/src/navigators/smart/statemanagment/StateManager.java @@ -49,7 +49,7 @@ public class StateManager { private HashSet senderRegencies = null; private HashSet senderLeaders = null; - private ReentrantLock lockState = new ReentrantLock(); + //private ReentrantLock lockState = new ReentrantLock(); private ReentrantLock lockTimer = new ReentrantLock(); private Timer stateTimer = null; @@ -160,6 +160,14 @@ public class StateManager { senderRegencies.clear(); } + public void emptyViews() { + senderViews.clear(); + } + + public void emptyLeaders() { + senderLeaders.clear(); + } + public void emptyRegencies(int regency) { for (SenderRegency m : senderRegencies) if (m.regency <= regency) senderRegencies.remove(m); @@ -263,11 +271,14 @@ public class StateManager { senderStates.toArray(st); int count = 0; - for (int i = 0; i < st.length; i++) { + for (int i = 0; i < st.length; i++, count = 0) { - for (int j = i; j < st.length; j++) { + for (int j = 0; j < st.length; j++) { + System.out.println("PID " + st[j].sender + " sent EID " + st[j].state.getLastEid()); + //System.out.println(st[i].state.equals(st[j].state) + " && " + st[j].state.hasState()); if (st[i].state.equals(st[j].state) && st[j].state.hasState()) count++; + System.out.println("Count: " + count); //******* EDUARDO BEGIN **************// if (count > SVManager.getCurrentViewF()) return st[j].state; //******* EDUARDO END **************// @@ -366,6 +377,10 @@ public class StateManager { //setWaiting(-1); changeReplica(); emptyStates(); + emptyEIDs(); + emptyLeaders(); + emptyRegencies(); + emptyViews(); setReplicaState(null); requestState(); @@ -382,7 +397,7 @@ public class StateManager { System.out.println("(TOMLayer.SMRequestDeliver) The state transfer protocol is enabled"); - lockState.lock(); + //lockState.lock(); System.out.println("(TOMLayer.SMRequestDeliver) I received a state request for EID " + msg.getEid() + " from replica " + msg.getSender()); @@ -392,7 +407,7 @@ public class StateManager { //TransferableState thisState = getLog().getTransferableState(msg.getEid(), sendState); ApplicationState thisState = dt.getRecoverer().getState(msg.getEid(), sendState); - lockState.unlock(); + //lockState.unlock(); if (thisState == null) { System.out.println("(TOMLayer.SMRequestDeliver) I don't have the state requested :-("); @@ -437,7 +452,7 @@ public class StateManager { if (moreThan2F_Views(msg.getView())) { currentView = msg.getView(); if (currentView.isMember(SVManager.getStaticConf().getProcessId())) { - System.out.println("Not a member anymore!"); + System.out.println("Not a member!"); } } @@ -453,9 +468,11 @@ public class StateManager { if (moreThanF_Replies()) { - System.out.println("(TOMLayer.SMReplyDeliver) I have at least " + SVManager.getCurrentViewF() + " replies!"); + System.out.println("(TOMLayer.SMReplyDeliver) I have more than " + SVManager.getCurrentViewF() + " replies!"); + System.out.println("[StateManager.getValidHash]"); ApplicationState recvState = getValidHash(); + System.out.println("[/StateManager.getValidHash]"); int haveState = 0; if (getReplicaState() != null) { @@ -493,7 +510,10 @@ public class StateManager { setWaiting(-1); - dt.update(msg.getEid(), recvState); + //Logger.debug = true; + + System.out.println("EID requested " + msg.getEid()); + dt.update(recvState); //Deal with stopped messages that may come from synchronization phase if (execManager.stopped()) { @@ -502,7 +522,7 @@ public class StateManager { for (PaxosMessage stopped : stoppedMsgs) { - if (stopped.getNumber() > msg.getEid()) + if (stopped.getNumber() > recvState.getLastEid() /*msg.getEid()*/) execManager.addOutOfContextMessage(stopped); } @@ -510,6 +530,7 @@ public class StateManager { execManager.restart(); } + Logger.println("Processing out of context messages"); tomLayer.processOutOfContext(); if (SVManager.getCurrentViewId() != currentView.getId()) { @@ -523,6 +544,10 @@ public class StateManager { dt.deliverUnlock(); emptyStates(); + emptyEIDs(); + emptyLeaders(); + emptyRegencies(); + emptyViews(); setReplicaState(null); System.out.println("I updated the state!"); @@ -539,6 +564,10 @@ public class StateManager { setWaiting(-1); emptyStates(); + emptyEIDs(); + emptyLeaders(); + emptyRegencies(); + emptyViews(); setReplicaState(null); //requestState(); @@ -550,6 +579,10 @@ public class StateManager { //setWaiting(-1); changeReplica(); emptyStates(); + emptyEIDs(); + emptyLeaders(); + emptyRegencies(); + emptyViews(); setReplicaState(null); requestState(); diff --git a/src/navigators/smart/tom/core/DeliveryThread.java b/src/navigators/smart/tom/core/DeliveryThread.java index 0c9cea9713f8a1601adca9b1094e4548540ab5b8..1c1b1b4d53f37e8ebcb781a2f4b2e52b4f315297 100644 --- a/src/navigators/smart/tom/core/DeliveryThread.java +++ b/src/navigators/smart/tom/core/DeliveryThread.java @@ -123,11 +123,12 @@ public final class DeliveryThread extends Thread { canDeliver.signalAll(); } - public void update(int eid, ApplicationState state) { + public void update(ApplicationState state) { - int lastEid = recoverer.setState(eid, state); + int lastEid = recoverer.setState(state); //set this consensus as the last executed + System.out.println("Setting last EID to " + lastEid); tomLayer.setLastExec(lastEid); //define the last stable consensus... the stable consensus can diff --git a/src/navigators/smart/tom/demo/counter/CounterServer.java b/src/navigators/smart/tom/demo/counter/CounterServer.java index b34c7cbb473216e6e8ea5a1ae972726761bad5d9..8943e57f31a8b710a26c6e354e1a7bf0aab304ac 100644 --- a/src/navigators/smart/tom/demo/counter/CounterServer.java +++ b/src/navigators/smart/tom/demo/counter/CounterServer.java @@ -24,9 +24,13 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.locks.ReentrantLock; import navigators.smart.statemanagment.ApplicationState; import navigators.smart.tom.MessageContext; import navigators.smart.tom.ServiceReplica; +import navigators.smart.tom.server.BatchExecutable; import navigators.smart.tom.server.SingleExecutable; import navigators.smart.tom.server.Recoverable; @@ -35,14 +39,24 @@ import navigators.smart.tom.server.Recoverable; * Example replica that implements a BFT replicated service (a counter). * */ -public final class CounterServer implements SingleExecutable, Recoverable { +public final class CounterServer implements BatchExecutable, Recoverable { - private ServiceReplica replica; + private ServiceReplica replica; private int counter = 0; private int iterations = 0; + private MessageDigest md; + private ReentrantLock stateLock = new ReentrantLock(); + private int lastEid = -1; + public CounterServer(int id) { replica = new ServiceReplica(id, this, this); + + try { + md = MessageDigest.getInstance("MD5"); // TODO: shouldn't it be SHA? + } catch (NoSuchAlgorithmException ex) { + ex.printStackTrace(); + } } //******* EDUARDO BEGIN **************// @@ -53,12 +67,23 @@ public final class CounterServer implements SingleExecutable, Recoverable { @Override - public byte[] executeOrdered(byte[] command, MessageContext msgCtx) { - return execute(command,msgCtx); + public byte[][] executeBatch(byte[][] commands, MessageContext[] msgCtxs) { + + stateLock.lock(); + + byte [][] replies = new byte[commands.length][]; + for (int i = 0; i < commands.length; i++) { + replies[i] = execute(commands[i],msgCtxs[i]); + } + + stateLock.unlock(); + + return replies; } @Override public byte[] executeUnordered(byte[] command, MessageContext msgCtx) { + return execute(command,msgCtx); } @@ -68,6 +93,8 @@ public final class CounterServer implements SingleExecutable, Recoverable { int increment = new DataInputStream(new ByteArrayInputStream(command)).readInt(); //System.out.println("read-only request: "+(msgCtx.getConsensusId() == -1)); counter += increment; + lastEid = msgCtx.getConsensusId(); + if (msgCtx.getConsensusId() == -1) System.out.println("(" + iterations + ") Counter incremented: " + counter); else @@ -96,43 +123,47 @@ public final class CounterServer implements SingleExecutable, Recoverable { } /** THIS IS JOAO'S CODE, TO HANDLE CHECKPOINTS */ - public byte[] getState() { - //System.out.println("reading counter: "+this.counter); + + @Override + public ApplicationState getState(int eid, boolean sendState) { + + stateLock.lock(); + + if (eid == -1 || eid > lastEid) return new CounterState(); byte[] b = new byte[4]; + byte[] d = null; + for (int i = 0; i < 4; i++) { int offset = (b.length - 1 - i) * 8; b[i] = (byte) ((counter >>> offset) & 0xFF); } - return b; - //throw new UnsupportedOperationException("Not supported yet."); + stateLock.unlock(); + + d = md.digest(b); + + return new CounterState(lastEid, (sendState ? b : null), d); } - public void setState(byte[] state) { + @Override + public int setState(ApplicationState state) { int value = 0; for (int i = 0; i < 4; i++) { int shift = (4 - 1 - i) * 8; - value += (state[i] & 0x000000FF) << shift; + value += (state.getSerializedState()[i] & 0x000000FF) << shift; } //System.out.println("setting counter to: "+value); + stateLock.lock(); this.counter = value; - - // System.out.println("Value of deserialized counter "+this.counter); + stateLock.unlock(); + this.lastEid = state.getLastEid(); + return state.getLastEid(); } - @Override - public ApplicationState getState(int eid, boolean sendState) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int setState(int eid, ApplicationState state) { - throw new UnsupportedOperationException("Not supported yet."); - } /********************************************************/ } diff --git a/src/navigators/smart/tom/demo/counter/CounterState.java b/src/navigators/smart/tom/demo/counter/CounterState.java new file mode 100644 index 0000000000000000000000000000000000000000..b824d1eee3463b860b301a91dec128c181226e96 --- /dev/null +++ b/src/navigators/smart/tom/demo/counter/CounterState.java @@ -0,0 +1,97 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package navigators.smart.tom.demo.counter; + +import java.util.Arrays; +import navigators.smart.statemanagment.ApplicationState; + +/** + * + * @author Joao Sousa + */ +public class CounterState implements ApplicationState { + + private byte[] state; // State associated with the last checkpoint + private byte[] stateHash; // Hash of the state associated with the last checkpoint + private int lastEid = -1; // Execution ID for the last messages batch delivered to the application + private boolean hasState; // indicates if the replica really had the requested state + + /** + * Constructs a TansferableState + * This constructor should be used when there is a valid state to construct the object with + * @param messageBatches Batches received since the last checkpoint. + * @param state State associated with the last checkpoint + * @param stateHash Hash of the state associated with the last checkpoint + */ + public CounterState(int lastEid, byte[] state, byte[] stateHash) { + + this.lastEid = lastEid; // Execution ID for the last messages batch delivered to the application + this.state = state; // State associated with the last checkpoint + this.stateHash = stateHash; + this.hasState = true; + } + + /** + * Constructs a TansferableState + * This constructor should be used when there isn't a valid state to construct the object with + */ + public CounterState() { + + this.lastEid = -1; + this.state = null; // State associated with the last checkpoint + this.stateHash = null; + this.hasState = false; + } + + + @Override + public int getLastEid() { + return lastEid; + } + + @Override + public boolean hasState() { + return hasState; + } + + @Override + public void setSerializedState(byte[] state) { + this.state = state; + } + + @Override + public byte[] getSerializedState() { + return state; + } + + @Override + public byte[] getStateHash() { + return stateHash; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof CounterState) { + CounterState tState = (CounterState) obj; + + return (Arrays.equals(this.stateHash, tState.stateHash) && + tState.lastEid == this.lastEid && tState.hasState == this.hasState); + } + return false; + } + + @Override + public int hashCode() { + int hash = 1; + hash = hash * 31 + this.lastEid; + hash = hash * 31 + (this.hasState ? 1 : 0); + if (this.stateHash != null) { + for (int i = 0; i < this.stateHash.length; i++) hash = hash * 31 + (int) this.stateHash[i]; + } else { + hash = hash * 31 + 0; + } + return hash; + } +} diff --git a/src/navigators/smart/tom/demo/keyvalue/BFTMapImpl.java b/src/navigators/smart/tom/demo/keyvalue/BFTMapImpl.java index 09382a589a3aeb3ab1e15e1b73f5026fb981b0c9..1fa8ea8eb072bc53aa4bdb04eadd78a861eea1d2 100644 --- a/src/navigators/smart/tom/demo/keyvalue/BFTMapImpl.java +++ b/src/navigators/smart/tom/demo/keyvalue/BFTMapImpl.java @@ -58,6 +58,7 @@ public class BFTMapImpl extends DefaultRecoverable { public byte[] getSnapshot() { try { + //System.out.println("[getSnapshot] tables: " + tableMap.getSizeofTable()); // serialize to byte array and return ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = new ObjectOutputStream(bos); diff --git a/src/navigators/smart/tom/demo/microbenchmarks/LatencyServer.java b/src/navigators/smart/tom/demo/microbenchmarks/LatencyServer.java index d8f5f3ad71b5c1173c6343830eb212cbc9bfc15b..80416be039f7f56732d7318f30fdc68ced9cdcec 100644 --- a/src/navigators/smart/tom/demo/microbenchmarks/LatencyServer.java +++ b/src/navigators/smart/tom/demo/microbenchmarks/LatencyServer.java @@ -132,7 +132,7 @@ public class LatencyServer implements SingleExecutable, Recoverable { } @Override - public int setState(int eid, ApplicationState state) { + public int setState(ApplicationState state) { throw new UnsupportedOperationException("Not supported yet."); } } diff --git a/src/navigators/smart/tom/demo/microbenchmarks/ThroughputLatencyServer.java b/src/navigators/smart/tom/demo/microbenchmarks/ThroughputLatencyServer.java index 8158b39aa7ac30b18a7a7a5f3f9dd8edeb66c43b..c94a656ca4e66e7d5c6e90ae39f424a32d146612 100644 --- a/src/navigators/smart/tom/demo/microbenchmarks/ThroughputLatencyServer.java +++ b/src/navigators/smart/tom/demo/microbenchmarks/ThroughputLatencyServer.java @@ -155,7 +155,7 @@ public final class ThroughputLatencyServer implements SingleExecutable, Recovera } @Override - public int setState(int eid, ApplicationState state) { + public int setState(ApplicationState state) { throw new UnsupportedOperationException("Not supported yet."); } } diff --git a/src/navigators/smart/tom/demo/random/RandomServer.java b/src/navigators/smart/tom/demo/random/RandomServer.java index 9d1a62638a9419fd5386d65b803503c3d95ed8fd..0e0e700137604bc616835344754d3df92669ed9a 100644 --- a/src/navigators/smart/tom/demo/random/RandomServer.java +++ b/src/navigators/smart/tom/demo/random/RandomServer.java @@ -32,6 +32,7 @@ import navigators.smart.statemanagment.ApplicationState; import navigators.smart.tom.ServiceReplica; import java.util.Scanner; import navigators.smart.tom.MessageContext; +import navigators.smart.tom.server.DefaultRecoverable; import navigators.smart.tom.server.Executable; import navigators.smart.tom.server.Recoverable; @@ -39,7 +40,7 @@ import navigators.smart.tom.server.Recoverable; * * @author Joao Sousa */ -public final class RandomServer implements Executable, Recoverable { +public final class RandomServer extends DefaultRecoverable { private int value = 0; private int iterations = 0; @@ -95,7 +96,7 @@ public final class RandomServer implements Executable, Recoverable { break; } - System.out.println("(" + id + ")[server] (" + iterations + " / " + + if (msgCtx != null) System.out.println("(" + id + ")[server] (" + iterations + " / " + msgCtx.getConsensusId() + " / " + msgCtx.getRegency() + ") Current value: " + value); ByteArrayOutputStream out = new ByteArrayOutputStream(4); @@ -174,13 +175,35 @@ public final class RandomServer implements Executable, Recoverable { } @Override - public ApplicationState getState(int eid, boolean sendState) { - throw new UnsupportedOperationException("Not supported yet."); + public void installSnapshot(byte[] state) { + int value = 0; + for (int i = 0; i < 4; i++) { + int shift = (4 - 1 - i) * 8; + value += (state[i] & 0x000000FF) << shift; + } + + this.value = value; + } + + @Override + public byte[] getSnapshot() { + byte[] b = new byte[4]; + //byte[] b = new byte[1024 * 1024 * 30]; + //for (int i = 0; i > b.length; i++) b[i] = (byte) i; + for (int i = 0; i < 4; i++) { + int offset = (b.length - 1 - i) * 8; + b[i] = (byte) ((value >>> offset) & 0xFF); + } + return b; } @Override - public int setState(int eid, ApplicationState state) { - throw new UnsupportedOperationException("Not supported yet."); + public byte[][] executeBatch2(byte[][] commands, MessageContext[] msgCtxs) { + byte [][] replies = new byte[commands.length][]; + for (int i = 0; i < commands.length; i++) { + replies[i] = executeOrdered(commands[i], (msgCtxs != null ? msgCtxs[i] : null)); + } + return replies; } } diff --git a/src/navigators/smart/tom/server/CommandsInfo.java b/src/navigators/smart/tom/server/CommandsInfo.java index 97610a0ee9a34dc02dd23229af236c20543b5ec2..f588f968e684d1bb199c14713a199edac6e52b1e 100644 --- a/src/navigators/smart/tom/server/CommandsInfo.java +++ b/src/navigators/smart/tom/server/CommandsInfo.java @@ -54,25 +54,42 @@ public class CommandsInfo implements Serializable { CommandsInfo ci = (CommandsInfo) obj; if ((this.commands != null && ci.commands == null) || - (this.commands == null && ci.commands != null)) return false; + (this.commands == null && ci.commands != null)) { + //System.out.println("[CommandsInfo] returing FALSE!1"); + return false; + } if (this.commands != null && ci.commands != null) { - if (this.commands.length != ci.commands.length) return false; + if (this.commands.length != ci.commands.length) { + //System.out.println("[CommandsInfo] returing FALSE!2"); + return false; + } for (int i = 0; i < this.commands.length; i++) { - if (this.commands[i] == null && ci.commands[i] != null) return false; - - if (this.commands[i] != null && ci.commands[i] == null) return false; + if (this.commands[i] == null && ci.commands[i] != null) { + //System.out.println("[CommandsInfo] returing FALSE!3"); + return false; + } + + if (this.commands[i] != null && ci.commands[i] == null) { + //System.out.println("[CommandsInfo] returing FALSE!4"); + return false; + } if (!(this.commands[i] == null && ci.commands[i] == null) && - (!Arrays.equals(this.commands, ci.commands))) return false; + (!Arrays.equals(this.commands[i], ci.commands[i]))) { + //System.out.println("[CommandsInfo] returing FALSE!5" + (this.commands[i] == null) + " " + (ci.commands[i] == null)); + return false; + } } } - + //System.out.print("[CommandsInfo] returnig........"); + //System.out.println((this.round == ci.round) + " " + (this.leader == ci.leader)); return this.round == ci.round && this.leader == ci.leader; } + //System.out.println("[CommandsInfo] returing FALSE!"); return false; } diff --git a/src/navigators/smart/tom/server/DefaultApplicationState.java b/src/navigators/smart/tom/server/DefaultApplicationState.java index 23dd89fea6dfddb577f1cd330f573eacb4e795c7..52e5c8fd4a69caffd6eaf8428990ca3af32d75ed 100644 --- a/src/navigators/smart/tom/server/DefaultApplicationState.java +++ b/src/navigators/smart/tom/server/DefaultApplicationState.java @@ -181,29 +181,50 @@ public class DefaultApplicationState implements ApplicationState { DefaultApplicationState tState = (DefaultApplicationState) obj; if ((this.messageBatches != null && tState.messageBatches == null) || - (this.messageBatches == null && tState.messageBatches != null)) return false; + (this.messageBatches == null && tState.messageBatches != null)) { + //System.out.println("[DefaultApplicationState] returing FALSE1!"); + return false; + } if (this.messageBatches != null && tState.messageBatches != null) { - if (this.messageBatches.length != tState.messageBatches.length) return false; + if (this.messageBatches.length != tState.messageBatches.length) { + //System.out.println("[DefaultApplicationState] returing FALSE2!"); + return false; + } for (int i = 0; i < this.messageBatches.length; i++) { - if (this.messageBatches[i] == null && tState.messageBatches[i] != null) return false; + if (this.messageBatches[i] == null && tState.messageBatches[i] != null) { + //System.out.println("[DefaultApplicationState] returing FALSE3!"); + return false; + } - if (this.messageBatches[i] != null && tState.messageBatches[i] == null) return false; + if (this.messageBatches[i] != null && tState.messageBatches[i] == null) { + //System.out.println("[DefaultApplicationState] returing FALSE4!"); + return false; + } if (!(this.messageBatches[i] == null && tState.messageBatches[i] == null) && - (!this.messageBatches[i].equals(tState.messageBatches[i]))) return false; + (!this.messageBatches[i].equals(tState.messageBatches[i]))) { + //System.out.println("[DefaultApplicationState] returing FALSE5!" + (this.messageBatches[i] == null) + " " + (tState.messageBatches[i] == null)); + return false; + } } } + //System.out.print("[DefaultApplicationState] returing........."); + //System.out.println(Arrays.equals(this.stateHash, tState.stateHash) + " && " + + // (tState.lastCheckpointEid == this.lastCheckpointEid) + " && " + + // (tState.lastCheckpointRound == this.lastCheckpointRound) + " && " + + // (tState.lastCheckpointLeader == this.lastCheckpointLeader) + " && " + + // (tState.lastEid == this.lastEid) + " && " + (tState.hasState == this.hasState)); return (Arrays.equals(this.stateHash, tState.stateHash) && tState.lastCheckpointEid == this.lastCheckpointEid && tState.lastCheckpointRound == this.lastCheckpointRound && tState.lastCheckpointLeader == this.lastCheckpointLeader && tState.lastEid == this.lastEid && tState.hasState == this.hasState); } - System.out.println("returing FALSE!"); + //System.out.println("[DefaultApplicationState] returing FALSE!"); return false; } diff --git a/src/navigators/smart/tom/server/DefaultRecoverable.java b/src/navigators/smart/tom/server/DefaultRecoverable.java index f6321ed7373241fd6cba8e7aaf7a16a33f55a81a..eadb74e39dc0dda8cff0a4d189a09dd0df8da695 100644 --- a/src/navigators/smart/tom/server/DefaultRecoverable.java +++ b/src/navigators/smart/tom/server/DefaultRecoverable.java @@ -20,11 +20,12 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable public static final int CHECKPOINT_PERIOD = 50; - private ReentrantLock lockState = new ReentrantLock(); + private ReentrantLock logLock = new ReentrantLock(); private ReentrantLock hashLock = new ReentrantLock(); + private ReentrantLock stateLock = new ReentrantLock(); private MessageDigest md; - + private StateLog log; public DefaultRecoverable() { @@ -39,17 +40,24 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable public byte[][] executeBatch(byte[][] commands, MessageContext[] msgCtxs) { int eid = msgCtxs[0].getConsensusId(); + + stateLock.lock(); + byte[][] replies = executeBatch2(commands, msgCtxs); + stateLock.unlock(); if ((eid > 0) && ((eid % CHECKPOINT_PERIOD) == 0)) { - Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + eid); - saveState(getSnapshot(), eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/); + Logger.println("(DefaultRecoverable.executeBatch) Performing checkpoint for consensus " + eid); + stateLock.lock(); + byte[] snapshot = getSnapshot(); + stateLock.unlock(); + saveState(snapshot, eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/); } else { - Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + eid); + Logger.println("(DefaultRecoverable.executeBatch) Storing message batch in the state log for consensus " + eid); saveCommands(commands, eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/); } - - return executeBatch2(commands, msgCtxs); } + return replies; + } public final byte[] computeHash(byte[] data) { @@ -68,7 +76,7 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable StateLog thisLog = getLog(); - lockState.lock(); + logLock.lock(); Logger.println("(TOMLayer.saveState) Saving state of EID " + lastEid + ", round " + decisionRound + " and leader " + leader); @@ -78,10 +86,10 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable thisLog.setLastCheckpointRound(decisionRound); thisLog.setLastCheckpointLeader(leader); - lockState.unlock(); - System.out.println("fiz checkpoint"); + logLock.unlock(); + /*System.out.println("fiz checkpoint"); System.out.println("tamanho do snapshot: " + snapshot.length); - System.out.println("tamanho do log: " + thisLog.getMessageBatches().length); + System.out.println("tamanho do log: " + thisLog.getMessageBatches().length);*/ Logger.println("(TOMLayer.saveState) Finished saving state of EID " + lastEid + ", round " + decisionRound + " and leader " + leader); } @@ -89,42 +97,49 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable StateLog thisLog = getLog(); - lockState.lock(); + logLock.lock(); Logger.println("(TOMLayer.saveBatch) Saving batch of EID " + lastEid + ", round " + decisionRound + " and leader " + leader); thisLog.addMessageBatch(commands, decisionRound, leader); thisLog.setLastEid(lastEid); - lockState.unlock(); + logLock.unlock(); - System.out.println("guardei comandos"); - System.out.println("tamanho do log: " + thisLog.getNumBatches()); + /*System.out.println("guardei comandos"); + System.out.println("tamanho do log: " + thisLog.getNumBatches());*/ Logger.println("(TOMLayer.saveBatch) Finished saving batch of EID " + lastEid + ", round " + decisionRound + " and leader " + leader); } @Override public ApplicationState getState(int eid, boolean sendState) { - return (eid > -1 ? getLog().getTransferableState(eid, sendState) : new DefaultApplicationState()); + logLock.lock(); + ApplicationState ret = (eid > -1 ? getLog().getApplicationState(eid, sendState) : new DefaultApplicationState()); + logLock.unlock(); + return ret; } @Override - public int setState(int recvEid, ApplicationState recvState) { + public int setState(ApplicationState recvState) { int lastEid = -1; if (recvState instanceof DefaultApplicationState) { DefaultApplicationState state = (DefaultApplicationState) recvState; + System.out.println("(DefaultRecoverable.setState) last eid in state: " + state.getLastEid()); + getLog().update(state); int lastCheckpointEid = state.getLastCheckpointEid(); - //int lastEid = state.getLastEid(); - lastEid = lastCheckpointEid + (state.getMessageBatches() != null ? state.getMessageBatches().length : 0); + + lastEid = state.getLastEid(); + //lastEid = lastCheckpointEid + (state.getMessageBatches() != null ? state.getMessageBatches().length : 0); - navigators.smart.tom.util.Logger.println("(DeliveryThread.update) I'm going to update myself from EID " + navigators.smart.tom.util.Logger.println("(DefaultRecoverable.setState) I'm going to update myself from EID " + lastCheckpointEid + " to EID " + lastEid); + stateLock.lock(); installSnapshot(state.getState()); // INUTIL?????? @@ -133,14 +148,17 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) { try { + + navigators.smart.tom.util.Logger.println("(DefaultRecoverable.setState) interpreting and verifying batched requests for eid " + eid); + System.out.println("(DefaultRecoverable.setState) interpreting and verifying batched requests for eid " + eid); + if (state.getMessageBatch(eid) == null) System.out.println("(DefaultRecoverable.setState) " + eid + " NULO!!!"); + byte[][] commands = state.getMessageBatch(eid).commands; // take a batch // INUTIL?????? //tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round, // state.getMessageBatch(eid).leader); - navigators.smart.tom.util.Logger.println("(DeliveryThread.update) interpreting and verifying batched requests."); - //TROCAR POR EXECUTE E ARRAY DE MENSAGENS!!!!!! //TOMMessage[] requests = new BatchReader(batch, // manager.getStaticConf().getUseSignatures() == 1).deserialiseRequests(manager); @@ -172,6 +190,7 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable } } + stateLock.unlock(); } diff --git a/src/navigators/smart/tom/server/Recoverable.java b/src/navigators/smart/tom/server/Recoverable.java index 87f13eb7eab64c543db8a126f513e1826890dba8..4449045b355144206025f986efa0b222826008fa 100644 --- a/src/navigators/smart/tom/server/Recoverable.java +++ b/src/navigators/smart/tom/server/Recoverable.java @@ -25,6 +25,6 @@ public interface Recoverable { * @param state State obtained in the state transfer protocol * @return */ - public int setState(int eid, ApplicationState state); + public int setState(ApplicationState state); } diff --git a/src/navigators/smart/tom/server/StateLog.java b/src/navigators/smart/tom/server/StateLog.java index 17083d8c0ce4f3ccef4bc8aa28ea8ac7fb6a6b92..2359bdb43633553a6c2d4254191b099d11a43e50 100644 --- a/src/navigators/smart/tom/server/StateLog.java +++ b/src/navigators/smart/tom/server/StateLog.java @@ -18,6 +18,8 @@ package navigators.smart.tom.server; +import org.apache.commons.codec.binary.Base64; + /** * This classes serves as a log for the state associated with the last checkpoint, and the message * batches received since the same checkpoint until the present. The state associated with the last @@ -65,7 +67,7 @@ public class StateLog { position = 0; this.state = state; this.stateHash = stateHash; - + } /** @@ -203,13 +205,14 @@ public class StateLog { * @param eid Execution ID correspondent to desired state * @return TransferableState Object containing this log information */ - public DefaultApplicationState getTransferableState(int eid, boolean setState) { + public DefaultApplicationState getApplicationState(int eid, boolean setState) { if (lastCheckpointEid > -1 && eid >= lastCheckpointEid) { CommandsInfo[] batches = null; - if (eid <= lastEid) { + int lastEid = -1; + if (eid <= this.lastEid) { int size = eid - lastCheckpointEid ; if (size > 0) { @@ -218,11 +221,13 @@ public class StateLog { for (int i = 0; i < size; i++) batches[i] = messageBatches[i]; } - } else if (lastEid > -1) { + lastEid = eid; + } else if (this.lastEid > -1) { batches = messageBatches; + lastEid = this.lastEid; } - return new DefaultApplicationState(batches, lastCheckpointEid, lastCheckpointRound, lastCheckpointLeader, eid, (setState ? state : null), stateHash); + return new DefaultApplicationState(batches, lastCheckpointEid, lastCheckpointRound, lastCheckpointLeader, lastEid, (setState ? state : null), stateHash); } else return null;