提交 2befb2cb 编写于 作者: L liquidsnake@sapo.pt

Fixed a bug on the leader election. It didn't work well together with the...

Fixed a bug on the leader election. It didn't work well together with the reconfiguration. Modified the LC protocol plus the state transfer to fix this issue.
上级 049ea6f8
......@@ -39,6 +39,7 @@ public class LeaderModule {
// este e a nova maneira de guardar info sobre o lider, desacoplada do consenso
private ServerViewManager reconfManager;
private int currentTS;
private int currentLeader;
/**
* Creates a new instance of LeaderModule
*/
......@@ -47,6 +48,7 @@ public class LeaderModule {
addLeaderInfo(0, 0, 0);
this.reconfManager = reconfManager;
currentTS = 0;
currentLeader = 0;
}
/**
......@@ -78,12 +80,16 @@ public class LeaderModule {
this.currentTS = ts;
}
public void setNewLeader (int leader) {
this.currentLeader = leader;
}
/**
* Obtem o lider currente, a partir do timestamp que tem
* @return Lider currente
*/
public int getCurrentLeader() {
return (currentTS % this.reconfManager.getCurrentViewN());
//return (currentTS % this.reconfManager.getCurrentViewN());
return currentLeader;
}
/**
* Retrieves the tuple for the specified round, given a list of tuples
......
......@@ -286,8 +286,8 @@ public final class Acceptor {
round.getExecution().getLearner().firstMessageProposed.decisionTime = System.nanoTime();
leaderModule.decided(round.getExecution().getId(),
leaderModule.getLeader(round.getExecution().getId(),
round.getNumber()));
tomLayer.lm.getCurrentLeader()/*leaderModule.getLeader(round.getExecution().getId(),
round.getNumber())*/);
round.getExecution().decided(round, value);
}
......
......@@ -243,7 +243,7 @@ public class ServerViewManager extends ViewManager {
}
return TOMUtil.getBytes(new ReconfigureReply(newV, jSetInfo.toArray(new String[0]),
eid, tomLayer.lm.getLeader(eid, decisionRound)));
eid, tomLayer.lm.getCurrentLeader() /*tomLayer.lm.getLeader(eid, decisionRound)*/));
}
public TOMMessage[] clearUpdates() {
......
......@@ -39,6 +39,7 @@ public class SMMessage extends SystemMessage implements Externalizable {
private int type; // Message type
private int replica; // Replica that should send the state
private int regency; // Current regency
private int leader; // Current leader
public final boolean TRIGGER_SM_LOCALLY; // indicates that the replica should
// initiate the SM protocol locally
......@@ -50,7 +51,7 @@ public class SMMessage extends SystemMessage implements Externalizable {
* @param replica Replica that should send the state
* @param state State log
*/
public SMMessage(int sender, int eid, int type, int replica, TransferableState state, int regency) {
public SMMessage(int sender, int eid, int type, int replica, TransferableState state, int regency, int leader) {
super(sender);
this.state = state;
......@@ -59,6 +60,7 @@ public class SMMessage extends SystemMessage implements Externalizable {
this.replica = replica;
this.sender = sender;
this.regency = regency;
this.leader = leader;
if (type == TOMUtil.TRIGGER_SM_LOCALLY && sender == -1) this.TRIGGER_SM_LOCALLY = true;
else this.TRIGGER_SM_LOCALLY = false;
......@@ -107,7 +109,15 @@ public class SMMessage extends SystemMessage implements Externalizable {
public int getRegency() {
return regency;
}
/**
* Retrieves the leader that the replica had when sending the state
* @return The leader that the replica had when sending the state
*/
public int getLeader() {
return leader;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException{
super.writeExternal(out);
......@@ -117,6 +127,7 @@ public class SMMessage extends SystemMessage implements Externalizable {
out.writeInt(type);
out.writeInt(replica);
out.writeInt(regency);
out.writeInt(leader);
out.writeObject(state);
}
......@@ -129,6 +140,7 @@ public class SMMessage extends SystemMessage implements Externalizable {
type = in.readInt();
replica = in.readInt();
regency = in.readInt();
leader = in.readInt();
state = (TransferableState) in.readObject();
}
}
......@@ -42,6 +42,7 @@ public class StateManager {
private HashSet<SenderEid> senderEids = null;
private HashSet<SenderState> senderStates = null;
private HashSet<SenderRegency> senderRegencies = null;
private HashSet<SenderLeader> senderLeaders = null;
private ReentrantLock lockState = new ReentrantLock();
private ReentrantLock lockTimer = new ReentrantLock();
......@@ -73,6 +74,7 @@ public class StateManager {
senderEids = new HashSet<SenderEid>();
senderStates = new HashSet<SenderState>();
senderRegencies = new HashSet<SenderRegency>();
senderLeaders = new HashSet<SenderLeader>();
this.replica = 0;
......@@ -140,7 +142,11 @@ public class StateManager {
public void addRegency(int sender, int regency) {
senderRegencies.add(new SenderRegency(sender, regency));
}
public void addLeader(int sender, int leader) {
senderLeaders.add(new SenderLeader(sender, leader));
}
public void emptyRegencies() {
senderRegencies.clear();
}
......@@ -166,7 +172,24 @@ public class StateManager {
return count > SVManager.getQuorum2F();
//******* EDUARDO END **************//
}
public boolean moreThan2F_Leaders(int leader) {
int count = 0;
HashSet<Integer> replicasCounted = new HashSet<Integer>();
for (SenderLeader m : senderLeaders) {
if (m.leader == leader && !replicasCounted.contains(m.sender)) {
replicasCounted.add(m.sender);
count++;
}
}
//******* EDUARDO BEGIN **************//
return count > SVManager.getQuorum2F();
//******* EDUARDO END **************//
}
public void addState(int sender, TransferableState state) {
senderStates.add(new SenderState(sender, state));
}
......@@ -320,7 +343,7 @@ public class StateManager {
//stateManager.emptyReplicas(eid);// isto causa uma excepcao
SMMessage smsg = new SMMessage(SVManager.getStaticConf().getProcessId(),
getWaiting(), TOMUtil.SM_REQUEST, getReplica(), null, -1);
getWaiting(), TOMUtil.SM_REQUEST, getReplica(), null, -1, -1);
tomLayer.getCommunication().send(SVManager.getCurrentViewOtherAcceptors(), smsg);
Logger.println("(TOMLayer.requestState) I just sent a request to the other replicas for the state up to EID " + getWaiting());
......@@ -332,7 +355,7 @@ public class StateManager {
int[] myself = new int[1];
myself[0] = SVManager.getStaticConf().getProcessId();
tomLayer.getCommunication().send(myself, new SMMessage(-1, getWaiting(), TOMUtil.TRIGGER_SM_LOCALLY, -1, null, -1));
tomLayer.getCommunication().send(myself, new SMMessage(-1, getWaiting(), TOMUtil.TRIGGER_SM_LOCALLY, -1, null, -1, -1));
......@@ -390,7 +413,7 @@ public class StateManager {
int[] targets = { msg.getSender() };
SMMessage smsg = new SMMessage(SVManager.getStaticConf().getProcessId(),
msg.getEid(), TOMUtil.SM_REPLY, -1, thisState, lcManager.getLastReg());
msg.getEid(), TOMUtil.SM_REPLY, -1, thisState, lcManager.getLastReg(), tomLayer.lm.getCurrentLeader());
// malicious code, to force the replica not to send the state
//if (reconfManager.getStaticConf().getProcessId() != 0 || !sendState)
......@@ -415,9 +438,12 @@ public class StateManager {
if (getWaiting() != -1 && msg.getEid() == getWaiting()) {
int currentRegency = -1;
int currentLeader = -1;
addRegency(msg.getSender(), msg.getRegency());
addLeader(msg.getSender(), msg.getLeader());
if (moreThan2F_Regencies(msg.getRegency())) currentRegency = msg.getRegency();
if (moreThan2F_Leaders(msg.getLeader())) currentLeader = msg.getLeader();
Logger.println("(TOMLayer.SMReplyDeliver) The reply is for the EID that I want!");
if (msg.getSender() == getReplica() && msg.getState().getState() != null) {
......@@ -445,11 +471,12 @@ public class StateManager {
}
}
if (recvState != null && haveState == 1 && currentRegency > -1) {
if (recvState != null && haveState == 1 && currentRegency > -1 && currentLeader > -1) {
lcManager.setLastReg(currentRegency);
lcManager.setNextReg(currentRegency);
tomLayer.lm.setNewReg(currentRegency);
tomLayer.lm.setNewLeader(currentLeader);
Logger.println("(TOMLayer.SMReplyDeliver) The state of those replies is good!");
......@@ -523,9 +550,9 @@ public class StateManager {
@Override
public boolean equals(Object obj) {
if (obj instanceof SenderEid) {
SenderEid m = (SenderEid) obj;
return (m.eid == this.regency && m.sender == this.sender);
if (obj instanceof SenderRegency) {
SenderRegency m = (SenderRegency) obj;
return (m.regency == this.regency && m.sender == this.sender);
}
return false;
}
......@@ -538,6 +565,34 @@ public class StateManager {
return hash;
}
}
private class SenderLeader {
private int sender;
private int leader;
SenderLeader(int sender, int leader) {
this.sender = sender;
this.leader = leader;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof SenderLeader) {
SenderLeader m = (SenderLeader) obj;
return (m.leader == this.leader && m.sender == this.sender);
}
return false;
}
@Override
public int hashCode() {
int hash = 1;
hash = hash * 31 + this.sender;
hash = hash * 31 + this.leader;
return hash;
}
}
private class SenderEid {
......
......@@ -298,11 +298,11 @@ public final class DeliveryThread extends Thread {
if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpointPeriod()) == 0)) {
Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
byte[] state = receiver.getState();
tomLayer.getStateManager().saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
tomLayer.getStateManager().saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
//TODO: possivelmente fazer mais alguma coisa
} else {
Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
tomLayer.getStateManager().saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber()));
tomLayer.getStateManager().saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
//TODO: possivelmente fazer mais alguma coisa
}
}
......
......@@ -618,10 +618,14 @@ public final class TOMLayer extends Thread implements RequestReceiver {
requestsTimer.startTimer();
int leader = regency % this.reconfManager.getCurrentViewN(); // novo lider
//int leader = regency % this.reconfManager.getCurrentViewN(); // novo lider
int leader = lcManager.getNewLeader();
int in = getInExec(); // eid a executar
int last = getLastExec(); // ultimo eid decidido
lm.setNewReg(regency);
lm.setNewLeader(leader);
// Se eu nao for o lider, tenho que enviar uma mensagem STOPDATA para ele
if (leader != this.reconfManager.getStaticConf().getProcessId()) {
......@@ -677,8 +681,6 @@ public final class TOMLayer extends Thread implements RequestReceiver {
out.close();
bos.close();
lm.setNewReg(regency);
int[] b = new int[1];
b[0] = leader;
......@@ -801,7 +803,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
// Sou o novo lider e estou a espera destas mensagem?
if (regency == lcManager.getLastReg() &&
this.reconfManager.getStaticConf().getProcessId() == (regency % this.reconfManager.getCurrentViewN())) {
this.reconfManager.getStaticConf().getProcessId() == lm.getCurrentLeader()/*(regency % this.reconfManager.getCurrentViewN())*/) {
//TODO: E preciso verificar a prova do ultimo consenso decidido e a assinatura do estado do consenso actual!
......@@ -877,7 +879,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
// Estou a espera desta mensagem, e recebi-a do novo lider?
if (msg.getReg() == lcManager.getLastReg() &&
msg.getReg() == lcManager.getNextReg() && msg.getSender() == (regency % this.reconfManager.getCurrentViewN())) {
msg.getReg() == lcManager.getNextReg() && msg.getSender() == lm.getCurrentLeader()/*(regency % this.reconfManager.getCurrentViewN())*/) {
LastEidData lastHighestEid = null;
int currentEid = -1;
......
......@@ -59,6 +59,8 @@ public class LCManager {
private ServerViewManager SVManager;
private MessageDigest md;
private TOMLayer tomLayer;
private int currentLeader;
/**
* Constructor
......@@ -70,6 +72,7 @@ public class LCManager {
this.tomLayer = tomLayer;
this.lastreg = 0;
this.nextreg = 0;
this.currentLeader = 0;
this.stops = new HashMap<Integer,HashSet<Integer>>();
this.lastEids = new HashMap<Integer, HashSet<LastEidData>>();
......@@ -78,7 +81,34 @@ public class LCManager {
this.SVManager = reconfManager;
this.md = md;
}
public int getNewLeader() {
int[] proc = SVManager.getCurrentViewProcesses();
int minProc = proc[0];
int maxProc = proc[0];
System.out.println("PROCESSOS: ");
for (int i = 0; i < proc.length; i++)
System.out.println(proc[i]);
for (int p : proc) {
if (p < minProc) minProc = p;
if (p > maxProc) maxProc = p;
}
do {
currentLeader++;
if (currentLeader > maxProc) {
currentLeader = minProc;
}
} while(!SVManager.isCurrentViewMember(currentLeader));
return currentLeader;
}
/**
* This is meant to keep track of timed out messages in this replica
*
......
......@@ -54,7 +54,7 @@ public class ShutdownHookThread extends Thread {
Round r = manager.getExecution(tomLayer.getLastExec()).getLastRound();
//******* EDUARDO BEGIN **************//
if(r != null){
System.err.println("Last executed leader: " + lm.getLeader(r.getExecution().getId(),r.getNumber()));
System.err.println("Last executed leader: " + tomLayer.lm.getCurrentLeader()/*lm.getLeader(r.getExecution().getId(),r.getNumber())*/);
System.err.println("State of the last executed round: "+r.toString());
}
//******* EDUARDO END **************//
......@@ -62,7 +62,7 @@ public class ShutdownHookThread extends Thread {
if(tomLayer.getInExec() != -1) {
Round r2 = manager.getExecution(tomLayer.getInExec()).getLastRound();
if(r2 != null) {
System.out.println("Consensus in execution leader: " + lm.getLeader(r2.getExecution().getId(),r.getNumber()));
System.out.println("Consensus in execution leader: " + tomLayer.lm.getCurrentLeader()/*lm.getLeader(r2.getExecution().getId(),r.getNumber())*/);
System.err.println("State of the round in execution: "+r2.toString());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册