提交 22ad56f4 编写于 作者: L liquidsnake@sapo.pt

Fixed some bugs on the new state transfer. Its more stable, but more testing is required.

上级 0902732a
......@@ -221,7 +221,7 @@ public final class ExecutionManager {
Logger.println("(ExecutionManager.checkLimits) Message for execution " +
msg.getNumber() + " is out of context, adding it to out of context set");
//System.out.println("(ExecutionManager.checkLimits) Message for execution " +
// msg.getNumber() + " is out of context, adding it to out of context set; isRetrievingState="+isRetrievingState);
......@@ -230,7 +230,7 @@ public final class ExecutionManager {
} else { //can process!
Logger.println("(ExecutionManager.checkLimits) message for execution " +
msg.getNumber() + " can be processed");
//Logger.debug = false;
canProcessTheMessage = true;
}
......
......@@ -93,8 +93,11 @@ public final class Acceptor {
*/
public final void deliver(PaxosMessage msg) {
if (manager.checkLimits(msg)) {
Logger.println("processing paxos msg with id " + msg.getNumber());
//Logger.debug = false;
processMessage(msg);
} else {
Logger.println("out of context msg with id " + msg.getNumber());
tomLayer.processOutOfContext();
}
}
......
......@@ -49,7 +49,7 @@ public class StateManager {
private HashSet<SenderRegency> senderRegencies = null;
private HashSet<SenderLeader> senderLeaders = null;
private ReentrantLock lockState = new ReentrantLock();
//private ReentrantLock lockState = new ReentrantLock();
private ReentrantLock lockTimer = new ReentrantLock();
private Timer stateTimer = null;
......@@ -160,6 +160,14 @@ public class StateManager {
senderRegencies.clear();
}
public void emptyViews() {
senderViews.clear();
}
public void emptyLeaders() {
senderLeaders.clear();
}
public void emptyRegencies(int regency) {
for (SenderRegency m : senderRegencies)
if (m.regency <= regency) senderRegencies.remove(m);
......@@ -263,11 +271,14 @@ public class StateManager {
senderStates.toArray(st);
int count = 0;
for (int i = 0; i < st.length; i++) {
for (int i = 0; i < st.length; i++, count = 0) {
for (int j = i; j < st.length; j++) {
for (int j = 0; j < st.length; j++) {
System.out.println("PID " + st[j].sender + " sent EID " + st[j].state.getLastEid());
//System.out.println(st[i].state.equals(st[j].state) + " && " + st[j].state.hasState());
if (st[i].state.equals(st[j].state) && st[j].state.hasState()) count++;
System.out.println("Count: " + count);
//******* EDUARDO BEGIN **************//
if (count > SVManager.getCurrentViewF()) return st[j].state;
//******* EDUARDO END **************//
......@@ -366,6 +377,10 @@ public class StateManager {
//setWaiting(-1);
changeReplica();
emptyStates();
emptyEIDs();
emptyLeaders();
emptyRegencies();
emptyViews();
setReplicaState(null);
requestState();
......@@ -382,7 +397,7 @@ public class StateManager {
System.out.println("(TOMLayer.SMRequestDeliver) The state transfer protocol is enabled");
lockState.lock();
//lockState.lock();
System.out.println("(TOMLayer.SMRequestDeliver) I received a state request for EID " + msg.getEid() + " from replica " + msg.getSender());
......@@ -392,7 +407,7 @@ public class StateManager {
//TransferableState thisState = getLog().getTransferableState(msg.getEid(), sendState);
ApplicationState thisState = dt.getRecoverer().getState(msg.getEid(), sendState);
lockState.unlock();
//lockState.unlock();
if (thisState == null) {
System.out.println("(TOMLayer.SMRequestDeliver) I don't have the state requested :-(");
......@@ -437,7 +452,7 @@ public class StateManager {
if (moreThan2F_Views(msg.getView())) {
currentView = msg.getView();
if (currentView.isMember(SVManager.getStaticConf().getProcessId())) {
System.out.println("Not a member anymore!");
System.out.println("Not a member!");
}
}
......@@ -453,9 +468,11 @@ public class StateManager {
if (moreThanF_Replies()) {
System.out.println("(TOMLayer.SMReplyDeliver) I have at least " + SVManager.getCurrentViewF() + " replies!");
System.out.println("(TOMLayer.SMReplyDeliver) I have more than " + SVManager.getCurrentViewF() + " replies!");
System.out.println("[StateManager.getValidHash]");
ApplicationState recvState = getValidHash();
System.out.println("[/StateManager.getValidHash]");
int haveState = 0;
if (getReplicaState() != null) {
......@@ -493,7 +510,10 @@ public class StateManager {
setWaiting(-1);
dt.update(msg.getEid(), recvState);
//Logger.debug = true;
System.out.println("EID requested " + msg.getEid());
dt.update(recvState);
//Deal with stopped messages that may come from synchronization phase
if (execManager.stopped()) {
......@@ -502,7 +522,7 @@ public class StateManager {
for (PaxosMessage stopped : stoppedMsgs) {
if (stopped.getNumber() > msg.getEid())
if (stopped.getNumber() > recvState.getLastEid() /*msg.getEid()*/)
execManager.addOutOfContextMessage(stopped);
}
......@@ -510,6 +530,7 @@ public class StateManager {
execManager.restart();
}
Logger.println("Processing out of context messages");
tomLayer.processOutOfContext();
if (SVManager.getCurrentViewId() != currentView.getId()) {
......@@ -523,6 +544,10 @@ public class StateManager {
dt.deliverUnlock();
emptyStates();
emptyEIDs();
emptyLeaders();
emptyRegencies();
emptyViews();
setReplicaState(null);
System.out.println("I updated the state!");
......@@ -539,6 +564,10 @@ public class StateManager {
setWaiting(-1);
emptyStates();
emptyEIDs();
emptyLeaders();
emptyRegencies();
emptyViews();
setReplicaState(null);
//requestState();
......@@ -550,6 +579,10 @@ public class StateManager {
//setWaiting(-1);
changeReplica();
emptyStates();
emptyEIDs();
emptyLeaders();
emptyRegencies();
emptyViews();
setReplicaState(null);
requestState();
......
......@@ -123,11 +123,12 @@ public final class DeliveryThread extends Thread {
canDeliver.signalAll();
}
public void update(int eid, ApplicationState state) {
public void update(ApplicationState state) {
int lastEid = recoverer.setState(eid, state);
int lastEid = recoverer.setState(state);
//set this consensus as the last executed
System.out.println("Setting last EID to " + lastEid);
tomLayer.setLastExec(lastEid);
//define the last stable consensus... the stable consensus can
......
......@@ -24,9 +24,13 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.locks.ReentrantLock;
import navigators.smart.statemanagment.ApplicationState;
import navigators.smart.tom.MessageContext;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.server.BatchExecutable;
import navigators.smart.tom.server.SingleExecutable;
import navigators.smart.tom.server.Recoverable;
......@@ -35,14 +39,24 @@ import navigators.smart.tom.server.Recoverable;
* Example replica that implements a BFT replicated service (a counter).
*
*/
public final class CounterServer implements SingleExecutable, Recoverable {
public final class CounterServer implements BatchExecutable, Recoverable {
private ServiceReplica replica;
private ServiceReplica replica;
private int counter = 0;
private int iterations = 0;
private MessageDigest md;
private ReentrantLock stateLock = new ReentrantLock();
private int lastEid = -1;
public CounterServer(int id) {
replica = new ServiceReplica(id, this, this);
try {
md = MessageDigest.getInstance("MD5"); // TODO: shouldn't it be SHA?
} catch (NoSuchAlgorithmException ex) {
ex.printStackTrace();
}
}
//******* EDUARDO BEGIN **************//
......@@ -53,12 +67,23 @@ public final class CounterServer implements SingleExecutable, Recoverable {
@Override
public byte[] executeOrdered(byte[] command, MessageContext msgCtx) {
return execute(command,msgCtx);
public byte[][] executeBatch(byte[][] commands, MessageContext[] msgCtxs) {
stateLock.lock();
byte [][] replies = new byte[commands.length][];
for (int i = 0; i < commands.length; i++) {
replies[i] = execute(commands[i],msgCtxs[i]);
}
stateLock.unlock();
return replies;
}
@Override
public byte[] executeUnordered(byte[] command, MessageContext msgCtx) {
return execute(command,msgCtx);
}
......@@ -68,6 +93,8 @@ public final class CounterServer implements SingleExecutable, Recoverable {
int increment = new DataInputStream(new ByteArrayInputStream(command)).readInt();
//System.out.println("read-only request: "+(msgCtx.getConsensusId() == -1));
counter += increment;
lastEid = msgCtx.getConsensusId();
if (msgCtx.getConsensusId() == -1)
System.out.println("(" + iterations + ") Counter incremented: " + counter);
else
......@@ -96,43 +123,47 @@ public final class CounterServer implements SingleExecutable, Recoverable {
}
/** THIS IS JOAO'S CODE, TO HANDLE CHECKPOINTS */
public byte[] getState() {
//System.out.println("reading counter: "+this.counter);
@Override
public ApplicationState getState(int eid, boolean sendState) {
stateLock.lock();
if (eid == -1 || eid > lastEid) return new CounterState();
byte[] b = new byte[4];
byte[] d = null;
for (int i = 0; i < 4; i++) {
int offset = (b.length - 1 - i) * 8;
b[i] = (byte) ((counter >>> offset) & 0xFF);
}
return b;
//throw new UnsupportedOperationException("Not supported yet.");
stateLock.unlock();
d = md.digest(b);
return new CounterState(lastEid, (sendState ? b : null), d);
}
public void setState(byte[] state) {
@Override
public int setState(ApplicationState state) {
int value = 0;
for (int i = 0; i < 4; i++) {
int shift = (4 - 1 - i) * 8;
value += (state[i] & 0x000000FF) << shift;
value += (state.getSerializedState()[i] & 0x000000FF) << shift;
}
//System.out.println("setting counter to: "+value);
stateLock.lock();
this.counter = value;
// System.out.println("Value of deserialized counter "+this.counter);
stateLock.unlock();
this.lastEid = state.getLastEid();
return state.getLastEid();
}
@Override
public ApplicationState getState(int eid, boolean sendState) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int setState(int eid, ApplicationState state) {
throw new UnsupportedOperationException("Not supported yet.");
}
/********************************************************/
}
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package navigators.smart.tom.demo.counter;
import java.util.Arrays;
import navigators.smart.statemanagment.ApplicationState;
/**
*
* @author Joao Sousa
*/
public class CounterState implements ApplicationState {
private byte[] state; // State associated with the last checkpoint
private byte[] stateHash; // Hash of the state associated with the last checkpoint
private int lastEid = -1; // Execution ID for the last messages batch delivered to the application
private boolean hasState; // indicates if the replica really had the requested state
/**
* Constructs a TansferableState
* This constructor should be used when there is a valid state to construct the object with
* @param messageBatches Batches received since the last checkpoint.
* @param state State associated with the last checkpoint
* @param stateHash Hash of the state associated with the last checkpoint
*/
public CounterState(int lastEid, byte[] state, byte[] stateHash) {
this.lastEid = lastEid; // Execution ID for the last messages batch delivered to the application
this.state = state; // State associated with the last checkpoint
this.stateHash = stateHash;
this.hasState = true;
}
/**
* Constructs a TansferableState
* This constructor should be used when there isn't a valid state to construct the object with
*/
public CounterState() {
this.lastEid = -1;
this.state = null; // State associated with the last checkpoint
this.stateHash = null;
this.hasState = false;
}
@Override
public int getLastEid() {
return lastEid;
}
@Override
public boolean hasState() {
return hasState;
}
@Override
public void setSerializedState(byte[] state) {
this.state = state;
}
@Override
public byte[] getSerializedState() {
return state;
}
@Override
public byte[] getStateHash() {
return stateHash;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof CounterState) {
CounterState tState = (CounterState) obj;
return (Arrays.equals(this.stateHash, tState.stateHash) &&
tState.lastEid == this.lastEid && tState.hasState == this.hasState);
}
return false;
}
@Override
public int hashCode() {
int hash = 1;
hash = hash * 31 + this.lastEid;
hash = hash * 31 + (this.hasState ? 1 : 0);
if (this.stateHash != null) {
for (int i = 0; i < this.stateHash.length; i++) hash = hash * 31 + (int) this.stateHash[i];
} else {
hash = hash * 31 + 0;
}
return hash;
}
}
......@@ -58,6 +58,7 @@ public class BFTMapImpl extends DefaultRecoverable {
public byte[] getSnapshot() {
try {
//System.out.println("[getSnapshot] tables: " + tableMap.getSizeofTable());
// serialize to byte array and return
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = new ObjectOutputStream(bos);
......
......@@ -132,7 +132,7 @@ public class LatencyServer implements SingleExecutable, Recoverable {
}
@Override
public int setState(int eid, ApplicationState state) {
public int setState(ApplicationState state) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
......@@ -155,7 +155,7 @@ public final class ThroughputLatencyServer implements SingleExecutable, Recovera
}
@Override
public int setState(int eid, ApplicationState state) {
public int setState(ApplicationState state) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
......@@ -32,6 +32,7 @@ import navigators.smart.statemanagment.ApplicationState;
import navigators.smart.tom.ServiceReplica;
import java.util.Scanner;
import navigators.smart.tom.MessageContext;
import navigators.smart.tom.server.DefaultRecoverable;
import navigators.smart.tom.server.Executable;
import navigators.smart.tom.server.Recoverable;
......@@ -39,7 +40,7 @@ import navigators.smart.tom.server.Recoverable;
*
* @author Joao Sousa
*/
public final class RandomServer implements Executable, Recoverable {
public final class RandomServer extends DefaultRecoverable {
private int value = 0;
private int iterations = 0;
......@@ -95,7 +96,7 @@ public final class RandomServer implements Executable, Recoverable {
break;
}
System.out.println("(" + id + ")[server] (" + iterations + " / " +
if (msgCtx != null) System.out.println("(" + id + ")[server] (" + iterations + " / " +
msgCtx.getConsensusId() + " / " + msgCtx.getRegency() + ") Current value: " + value);
ByteArrayOutputStream out = new ByteArrayOutputStream(4);
......@@ -174,13 +175,35 @@ public final class RandomServer implements Executable, Recoverable {
}
@Override
public ApplicationState getState(int eid, boolean sendState) {
throw new UnsupportedOperationException("Not supported yet.");
public void installSnapshot(byte[] state) {
int value = 0;
for (int i = 0; i < 4; i++) {
int shift = (4 - 1 - i) * 8;
value += (state[i] & 0x000000FF) << shift;
}
this.value = value;
}
@Override
public byte[] getSnapshot() {
byte[] b = new byte[4];
//byte[] b = new byte[1024 * 1024 * 30];
//for (int i = 0; i > b.length; i++) b[i] = (byte) i;
for (int i = 0; i < 4; i++) {
int offset = (b.length - 1 - i) * 8;
b[i] = (byte) ((value >>> offset) & 0xFF);
}
return b;
}
@Override
public int setState(int eid, ApplicationState state) {
throw new UnsupportedOperationException("Not supported yet.");
public byte[][] executeBatch2(byte[][] commands, MessageContext[] msgCtxs) {
byte [][] replies = new byte[commands.length][];
for (int i = 0; i < commands.length; i++) {
replies[i] = executeOrdered(commands[i], (msgCtxs != null ? msgCtxs[i] : null));
}
return replies;
}
}
......@@ -54,25 +54,42 @@ public class CommandsInfo implements Serializable {
CommandsInfo ci = (CommandsInfo) obj;
if ((this.commands != null && ci.commands == null) ||
(this.commands == null && ci.commands != null)) return false;
(this.commands == null && ci.commands != null)) {
//System.out.println("[CommandsInfo] returing FALSE!1");
return false;
}
if (this.commands != null && ci.commands != null) {
if (this.commands.length != ci.commands.length) return false;
if (this.commands.length != ci.commands.length) {
//System.out.println("[CommandsInfo] returing FALSE!2");
return false;
}
for (int i = 0; i < this.commands.length; i++) {
if (this.commands[i] == null && ci.commands[i] != null) return false;
if (this.commands[i] != null && ci.commands[i] == null) return false;
if (this.commands[i] == null && ci.commands[i] != null) {
//System.out.println("[CommandsInfo] returing FALSE!3");
return false;
}
if (this.commands[i] != null && ci.commands[i] == null) {
//System.out.println("[CommandsInfo] returing FALSE!4");
return false;
}
if (!(this.commands[i] == null && ci.commands[i] == null) &&
(!Arrays.equals(this.commands, ci.commands))) return false;
(!Arrays.equals(this.commands[i], ci.commands[i]))) {
//System.out.println("[CommandsInfo] returing FALSE!5" + (this.commands[i] == null) + " " + (ci.commands[i] == null));
return false;
}
}
}
//System.out.print("[CommandsInfo] returnig........");
//System.out.println((this.round == ci.round) + " " + (this.leader == ci.leader));
return this.round == ci.round && this.leader == ci.leader;
}
//System.out.println("[CommandsInfo] returing FALSE!");
return false;
}
......
......@@ -181,29 +181,50 @@ public class DefaultApplicationState implements ApplicationState {
DefaultApplicationState tState = (DefaultApplicationState) obj;
if ((this.messageBatches != null && tState.messageBatches == null) ||
(this.messageBatches == null && tState.messageBatches != null)) return false;
(this.messageBatches == null && tState.messageBatches != null)) {
//System.out.println("[DefaultApplicationState] returing FALSE1!");
return false;
}
if (this.messageBatches != null && tState.messageBatches != null) {
if (this.messageBatches.length != tState.messageBatches.length) return false;
if (this.messageBatches.length != tState.messageBatches.length) {
//System.out.println("[DefaultApplicationState] returing FALSE2!");
return false;
}
for (int i = 0; i < this.messageBatches.length; i++) {
if (this.messageBatches[i] == null && tState.messageBatches[i] != null) return false;
if (this.messageBatches[i] == null && tState.messageBatches[i] != null) {
//System.out.println("[DefaultApplicationState] returing FALSE3!");
return false;
}
if (this.messageBatches[i] != null && tState.messageBatches[i] == null) return false;
if (this.messageBatches[i] != null && tState.messageBatches[i] == null) {
//System.out.println("[DefaultApplicationState] returing FALSE4!");
return false;
}
if (!(this.messageBatches[i] == null && tState.messageBatches[i] == null) &&
(!this.messageBatches[i].equals(tState.messageBatches[i]))) return false;
(!this.messageBatches[i].equals(tState.messageBatches[i]))) {
//System.out.println("[DefaultApplicationState] returing FALSE5!" + (this.messageBatches[i] == null) + " " + (tState.messageBatches[i] == null));
return false;
}
}
}
//System.out.print("[DefaultApplicationState] returing.........");
//System.out.println(Arrays.equals(this.stateHash, tState.stateHash) + " && " +
// (tState.lastCheckpointEid == this.lastCheckpointEid) + " && " +
// (tState.lastCheckpointRound == this.lastCheckpointRound) + " && " +
// (tState.lastCheckpointLeader == this.lastCheckpointLeader) + " && " +
// (tState.lastEid == this.lastEid) + " && " + (tState.hasState == this.hasState));
return (Arrays.equals(this.stateHash, tState.stateHash) &&
tState.lastCheckpointEid == this.lastCheckpointEid &&
tState.lastCheckpointRound == this.lastCheckpointRound &&
tState.lastCheckpointLeader == this.lastCheckpointLeader &&
tState.lastEid == this.lastEid && tState.hasState == this.hasState);
}
System.out.println("returing FALSE!");
//System.out.println("[DefaultApplicationState] returing FALSE!");
return false;
}
......
......@@ -20,11 +20,12 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable
public static final int CHECKPOINT_PERIOD = 50;
private ReentrantLock lockState = new ReentrantLock();
private ReentrantLock logLock = new ReentrantLock();
private ReentrantLock hashLock = new ReentrantLock();
private ReentrantLock stateLock = new ReentrantLock();
private MessageDigest md;
private StateLog log;
public DefaultRecoverable() {
......@@ -39,17 +40,24 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable
public byte[][] executeBatch(byte[][] commands, MessageContext[] msgCtxs) {
int eid = msgCtxs[0].getConsensusId();
stateLock.lock();
byte[][] replies = executeBatch2(commands, msgCtxs);
stateLock.unlock();
if ((eid > 0) && ((eid % CHECKPOINT_PERIOD) == 0)) {
Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + eid);
saveState(getSnapshot(), eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
Logger.println("(DefaultRecoverable.executeBatch) Performing checkpoint for consensus " + eid);
stateLock.lock();
byte[] snapshot = getSnapshot();
stateLock.unlock();
saveState(snapshot, eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
} else {
Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + eid);
Logger.println("(DefaultRecoverable.executeBatch) Storing message batch in the state log for consensus " + eid);
saveCommands(commands, eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
}
return executeBatch2(commands, msgCtxs); }
return replies;
}
public final byte[] computeHash(byte[] data) {
......@@ -68,7 +76,7 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable
StateLog thisLog = getLog();
lockState.lock();
logLock.lock();
Logger.println("(TOMLayer.saveState) Saving state of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
......@@ -78,10 +86,10 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable
thisLog.setLastCheckpointRound(decisionRound);
thisLog.setLastCheckpointLeader(leader);
lockState.unlock();
System.out.println("fiz checkpoint");
logLock.unlock();
/*System.out.println("fiz checkpoint");
System.out.println("tamanho do snapshot: " + snapshot.length);
System.out.println("tamanho do log: " + thisLog.getMessageBatches().length);
System.out.println("tamanho do log: " + thisLog.getMessageBatches().length);*/
Logger.println("(TOMLayer.saveState) Finished saving state of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
}
......@@ -89,42 +97,49 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable
StateLog thisLog = getLog();
lockState.lock();
logLock.lock();
Logger.println("(TOMLayer.saveBatch) Saving batch of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
thisLog.addMessageBatch(commands, decisionRound, leader);
thisLog.setLastEid(lastEid);
lockState.unlock();
logLock.unlock();
System.out.println("guardei comandos");
System.out.println("tamanho do log: " + thisLog.getNumBatches());
/*System.out.println("guardei comandos");
System.out.println("tamanho do log: " + thisLog.getNumBatches());*/
Logger.println("(TOMLayer.saveBatch) Finished saving batch of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
}
@Override
public ApplicationState getState(int eid, boolean sendState) {
return (eid > -1 ? getLog().getTransferableState(eid, sendState) : new DefaultApplicationState());
logLock.lock();
ApplicationState ret = (eid > -1 ? getLog().getApplicationState(eid, sendState) : new DefaultApplicationState());
logLock.unlock();
return ret;
}
@Override
public int setState(int recvEid, ApplicationState recvState) {
public int setState(ApplicationState recvState) {
int lastEid = -1;
if (recvState instanceof DefaultApplicationState) {
DefaultApplicationState state = (DefaultApplicationState) recvState;
System.out.println("(DefaultRecoverable.setState) last eid in state: " + state.getLastEid());
getLog().update(state);
int lastCheckpointEid = state.getLastCheckpointEid();
//int lastEid = state.getLastEid();
lastEid = lastCheckpointEid + (state.getMessageBatches() != null ? state.getMessageBatches().length : 0);
lastEid = state.getLastEid();
//lastEid = lastCheckpointEid + (state.getMessageBatches() != null ? state.getMessageBatches().length : 0);
navigators.smart.tom.util.Logger.println("(DeliveryThread.update) I'm going to update myself from EID "
navigators.smart.tom.util.Logger.println("(DefaultRecoverable.setState) I'm going to update myself from EID "
+ lastCheckpointEid + " to EID " + lastEid);
stateLock.lock();
installSnapshot(state.getState());
// INUTIL??????
......@@ -133,14 +148,17 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable
for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
try {
navigators.smart.tom.util.Logger.println("(DefaultRecoverable.setState) interpreting and verifying batched requests for eid " + eid);
System.out.println("(DefaultRecoverable.setState) interpreting and verifying batched requests for eid " + eid);
if (state.getMessageBatch(eid) == null) System.out.println("(DefaultRecoverable.setState) " + eid + " NULO!!!");
byte[][] commands = state.getMessageBatch(eid).commands; // take a batch
// INUTIL??????
//tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round,
// state.getMessageBatch(eid).leader);
navigators.smart.tom.util.Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
//TROCAR POR EXECUTE E ARRAY DE MENSAGENS!!!!!!
//TOMMessage[] requests = new BatchReader(batch,
// manager.getStaticConf().getUseSignatures() == 1).deserialiseRequests(manager);
......@@ -172,6 +190,7 @@ public abstract class DefaultRecoverable implements Recoverable, BatchExecutable
}
}
stateLock.unlock();
}
......
......@@ -25,6 +25,6 @@ public interface Recoverable {
* @param state State obtained in the state transfer protocol
* @return
*/
public int setState(int eid, ApplicationState state);
public int setState(ApplicationState state);
}
......@@ -18,6 +18,8 @@
package navigators.smart.tom.server;
import org.apache.commons.codec.binary.Base64;
/**
* This classes serves as a log for the state associated with the last checkpoint, and the message
* batches received since the same checkpoint until the present. The state associated with the last
......@@ -65,7 +67,7 @@ public class StateLog {
position = 0;
this.state = state;
this.stateHash = stateHash;
}
/**
......@@ -203,13 +205,14 @@ public class StateLog {
* @param eid Execution ID correspondent to desired state
* @return TransferableState Object containing this log information
*/
public DefaultApplicationState getTransferableState(int eid, boolean setState) {
public DefaultApplicationState getApplicationState(int eid, boolean setState) {
if (lastCheckpointEid > -1 && eid >= lastCheckpointEid) {
CommandsInfo[] batches = null;
if (eid <= lastEid) {
int lastEid = -1;
if (eid <= this.lastEid) {
int size = eid - lastCheckpointEid ;
if (size > 0) {
......@@ -218,11 +221,13 @@ public class StateLog {
for (int i = 0; i < size; i++)
batches[i] = messageBatches[i];
}
} else if (lastEid > -1) {
lastEid = eid;
} else if (this.lastEid > -1) {
batches = messageBatches;
lastEid = this.lastEid;
}
return new DefaultApplicationState(batches, lastCheckpointEid, lastCheckpointRound, lastCheckpointLeader, eid, (setState ? state : null), stateHash);
return new DefaultApplicationState(batches, lastCheckpointEid, lastCheckpointRound, lastCheckpointLeader, lastEid, (setState ? state : null), stateHash);
}
else return null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册