提交 792ce889 编写于 作者: L liquidsnake@sapo.pt

Copied bug fixes from branch to main trunk, part 2

上级 64233ac8
......@@ -147,7 +147,7 @@ public final class ExecutionManager {
if (tomLayer.getInExec() != -1) {
stoppedRound = getExecution(tomLayer.getInExec()).getLastRound();
//stoppedRound.getTimeoutTask().cancel();
Logger.println("(ExecutionManager.stop) Stoping round " + stoppedRound.getNumber() + " of consensus " + stoppedRound.getExecution().getId());
if (stoppedRound != null) Logger.println("(ExecutionManager.stop) Stoping round " + stoppedRound.getNumber() + " of consensus " + tomLayer.getInExec());
}
stoppedMsgsLock.unlock();
}
......@@ -412,6 +412,7 @@ public final class ExecutionManager {
outOfContextLock.lock();
/******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
if (m.getPaxosType() == MessageFactory.PROPOSE) {
Logger.println("(ExecutionManager.addOutOfContextMessage) adding " + m);
outOfContextProposes.put(m.getNumber(), m);
} else {
List<PaxosMessage> messages = outOfContext.get(m.getNumber());
......@@ -419,11 +420,9 @@ public final class ExecutionManager {
messages = new LinkedList<PaxosMessage>();
outOfContext.put(m.getNumber(), messages);
}
Logger.println("(ExecutionManager.addOutOfContextMessage) adding " + m);
messages.add(m);
if (outOfContext.size() % 1000 == 0) {
Logger.println("(ExecutionManager.addOutOfContextMessage) out-of-context size: " + outOfContext.size());
}
}
/******* END OUTOFCONTEXT CRITICAL SECTION *******/
......
......@@ -235,7 +235,10 @@ public final class Acceptor {
round.setStrong(me, value);
round.getExecution().getLearner().firstMessageProposed.strongSentTime = System.nanoTime();
if(round.getExecution().getLearner().firstMessageProposed==null) {
round.getExecution().getLearner().firstMessageProposed.strongSentTime = System.nanoTime();
}
communication.send(this.reconfManager.getCurrentViewOtherAcceptors(),
factory.createStrong(eid, round.getNumber(), value));
......
......@@ -74,6 +74,8 @@ public class StateManager {
private LCManager lcManager;
private ExecutionManager execManager;
private boolean appStateOnly;
public StateManager(ServerViewManager manager, TOMLayer tomLayer, DeliveryThread dt, LCManager lcManager, ExecutionManager execManager) {
//******* EDUARDO BEGIN **************//
......@@ -98,6 +100,8 @@ public class StateManager {
this.state = null;
this.lastEid = -1;
this.waitingEid = -1;
appStateOnly = false;
}
public int getReplica() {
......@@ -319,6 +323,14 @@ public class StateManager {
return senderStates.size();
}
public void requestAppState(int eid) {
setLastEID(eid + 1);
setWaiting(eid);
appStateOnly = true;
requestState();
}
public void analyzeState(int sender, int eid) {
Logger.println("(TOMLayer.analyzeState) The state transfer protocol is enabled");
......@@ -436,56 +448,6 @@ public class StateManager {
}
}
public void temp(byte[] state) {
try {
// serialize to byte array and return
ByteArrayInputStream bis = new ByteArrayInputStream(state);
ObjectInput in = new ObjectInputStream(bis);
BFTTableMap tableMap = (BFTTableMap) in.readObject();
in.close();
bis.close();
showTables(tableMap);
} catch (ClassNotFoundException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
}
}
public void showTables(BFTTableMap tableMap) {
try {
Map<String, Map<String, byte[]>> tables = tableMap.getTables();
Collection<String> tableNames = tables.keySet();
ByteArrayOutputStream baos = new ByteArrayOutputStream(10000);
DataOutputStream dos = new DataOutputStream(baos);
for(String tableName : tableNames) {
System.out.println("[StateManager][showTables] Table name: " + tableName);
dos.writeUTF(tableName);
Map<String, byte[]> tableTmp = tables.get(tableName);
dos.writeInt(tableTmp.size());
for(String key : tableTmp.keySet()) {
dos.writeUTF(key);
dos.flush();
byte[] value = tableTmp.get(key);
dos.writeInt(value.length);
dos.write(value);
dos.flush();
System.out.println("[StateManager][showTables] ---- Size of key '" + key + "': " + value.length);
}
System.out.println("[StateManager][showTables] ---- Count of rows for table '" + tableName + "': " + tableTmp.size());
dos.flush();
}
byte[] state = baos.toByteArray();
System.out.println("[StateManager][showTables] Current byte array size: " + state.length);
//return state;
} catch (IOException ex) {
ex.printStackTrace();
//return new byte[0];
}
}
public void SMReplyDeliver(SMMessage msg) {
......@@ -503,24 +465,25 @@ public class StateManager {
int currentRegency = -1;
int currentLeader = -1;
View currentView = null;
addRegency(msg.getSender(), msg.getRegency());
addLeader(msg.getSender(), msg.getLeader());
addView(msg.getSender(), msg.getView());
if (moreThan2F_Regencies(msg.getRegency())) currentRegency = msg.getRegency();
if (moreThan2F_Leaders(msg.getLeader())) currentLeader = msg.getLeader();
if (moreThan2F_Views(msg.getView())) {
currentView = msg.getView();
if (currentView.isMember(SVManager.getStaticConf().getProcessId())) {
System.out.println("Not a member!");
if (!appStateOnly) {
addRegency(msg.getSender(), msg.getRegency());
addLeader(msg.getSender(), msg.getLeader());
addView(msg.getSender(), msg.getView());
if (moreThan2F_Regencies(msg.getRegency())) currentRegency = msg.getRegency();
if (moreThan2F_Leaders(msg.getLeader())) currentLeader = msg.getLeader();
if (moreThan2F_Views(msg.getView())) {
currentView = msg.getView();
if (!currentView.isMember(SVManager.getStaticConf().getProcessId())) {
System.out.println("Not a member!");
}
}
} else {
currentLeader = tomLayer.lm.getCurrentLeader();
currentRegency = lcManager.getLastReg();
currentView = SVManager.getCurrentView();
}
/*if (msg.getState().hasState()) {
System.out.println("(TOMLayer.SMReplyDeliver) Snapshot da replica " + msg.getSender() + " para o estado " + ((DefaultApplicationState) msg.getState()).getLastCheckpointEid());
temp(msg.getState().getSerializedState());
}*/
System.out.println("(TOMLayer.SMReplyDeliver) The reply is for the EID that I want!");
if (msg.getSender() == getReplica() && msg.getState().getSerializedState() != null) {
......@@ -583,7 +546,7 @@ public class StateManager {
dt.update(recvState);
//Deal with stopped messages that may come from synchronization phase
if (execManager.stopped()) {
if (!appStateOnly && execManager.stopped()) {
Queue<PaxosMessage> stoppedMsgs = execManager.getStoppedMsgs();
......@@ -622,6 +585,10 @@ public class StateManager {
tomLayer.requestsTimer.Enabled(true);
tomLayer.requestsTimer.startTimer();
if (appStateOnly) {
appStateOnly = false;
tomLayer.resumeLC();
}
//******* EDUARDO BEGIN **************//
} else if (recvState == null && (SVManager.getCurrentViewN() / 2) < getReplies()) {
//******* EDUARDO END **************//
......@@ -639,6 +606,10 @@ public class StateManager {
//requestState();
if (stateTimer != null) stateTimer.cancel();
if (appStateOnly) {
requestState();
}
} else if (haveState == -1) {
System.out.println("(TOMLayer.SMReplyDeliver) The replica from which I expected the state, sent one which doesn't match the hash of the others, or it never sent it at all");
......
......@@ -1014,7 +1014,21 @@ public final class TOMLayer extends Thread implements RequestReceiver {
}
}
}
// temporary info to resume LC protocol
private int tempRegency = -1;
private LastEidData tempLastHighestEid = null;
private int tempCurrentEid = -1;
private HashSet<SignedObject> tempSignedCollects = null;
private byte[] tempPropose = null;
private int tempBatchSize = -1;
private boolean tempIAmLeader = false;
public void resumeLC() {
finalise(tempRegency, tempLastHighestEid, tempCurrentEid,
tempSignedCollects, tempPropose, tempBatchSize, tempIAmLeader);
}
// this method is called on all replicas, and serves to verify and apply the
// information sent in the catch-up message
private void finalise(int regency, LastEidData lastHighestEid,
......@@ -1030,7 +1044,20 @@ public final class TOMLayer extends Thread implements RequestReceiver {
//TODO: Case in which it is necessary to apply state transfer
System.out.println("NEEDING TO USE STATE TRANSFER!! (" + lastHighestEid.getEid() + ")");
tempRegency = regency;
tempLastHighestEid = lastHighestEid;
tempCurrentEid = currentEid;
tempSignedCollects = signedCollects;
tempPropose = propose;
tempBatchSize = batchSize;
tempIAmLeader = iAmLeader;
execManager.getStoppedMsgs().add(acceptor.getFactory().createPropose(currentEid, 0, propose, null));
stateManager.requestAppState(lastHighestEid.getEid());
return;
} else if (getLastExec() + 1 == lastHighestEid.getEid()) {
// Is this replica still executing the last decided consensus?
......@@ -1048,10 +1075,10 @@ public final class TOMLayer extends Thread implements RequestReceiver {
r.clear();
}
byte[] hash = computeHash(propose);
byte[] hash = computeHash(lastHighestEid.getEidDecision());
r.propValueHash = hash;
r.propValue = propose;
r.deserializedPropValue = checkProposedValue(propose);
r.propValue = lastHighestEid.getEidDecision();
r.deserializedPropValue = checkProposedValue(lastHighestEid.getEidDecision());
exec.decided(r, hash); // pass the decision to the delivery thread
}
byte[] tmpval = null;
......@@ -1093,9 +1120,12 @@ public final class TOMLayer extends Thread implements RequestReceiver {
r.propValue = tmpval;
r.deserializedPropValue = checkProposedValue(tmpval);
if(exec.getLearner().firstMessageProposed == null)
exec.getLearner().firstMessageProposed = r.deserializedPropValue[0];
if(exec.getLearner().firstMessageProposed == null) {
if (r.deserializedPropValue != null &&
r.deserializedPropValue.length > 0)
exec.getLearner().firstMessageProposed = r.deserializedPropValue[0];
else exec.getLearner().firstMessageProposed = new TOMMessage(); // to avoid null pointer
}
r.setWeak(me, hash);
lm.setNewReg(regency);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册