提交 d9aee962 编写于 作者: L liquidsnake@sapo.pt

Updated the state transfer protocol to also include the latest view, in order...

Updated the state transfer protocol to also include the latest view, in order to avoid delayed replicas from getting blocked after applying the state. If the view had changed in the middle of a state transfer and the view updates where already erased from the state log, the replica would alreays discard client requests, since they where for a view ahead of the one it had.

Also fixed a bug that wouldn't let the replica execute the requests contained in a state transfer log, also quase by the fact that a client might had a different view from the replica.
上级 c40f490e
......@@ -201,8 +201,8 @@ public final class ExecutionManager {
// while a replica is receiving the state of the others and updating itself
if (isRetrievingState || // Is this replica retrieving a state?
(!(lastConsId == -1 && msg.getNumber() >= (lastConsId + revivalHighMark)) && //not a recovered replica
(msg.getNumber() > lastConsId && (msg.getNumber() < (lastConsId + paxosHighMark)))) && // not an ahead of time message
!(stopped && msg.getNumber() >= (lastConsId + timeoutHighMark))) { // not a timed-out replica which needs to fetch the state
(msg.getNumber() > lastConsId && (msg.getNumber() < (lastConsId + paxosHighMark))) && // not an ahead of time message
!(stopped && msg.getNumber() >= (lastConsId + timeoutHighMark)))) { // not a timed-out replica which needs to fetch the state
if (stopped) {//just an optimization to avoid calling the lock in normal case
stoppedMsgsLock.lock();
......
......@@ -282,7 +282,7 @@ public final class Acceptor {
* @param round Round at which the decision is made
* @param value The decided value (got from WEAK or STRONG messages)
*/
private void decide(Round round, byte[] value) {
private void decide(Round round, byte[] value) {
round.getExecution().getLearner().firstMessageProposed.decisionTime = System.nanoTime();
leaderModule.decided(round.getExecution().getId(),
......
......@@ -80,5 +80,29 @@ public class View implements Serializable {
public InetSocketAddress getAddress(int id) {
return addresses.get(id);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof View) {
View v = (View) obj;
return (this.addresses.equals(v.addresses) &&
Arrays.equals(this.processes, v.processes)
&& this.id == v.id && this.f == v.f);
}
return false;
}
public int hashCode() {
int hash = 1;
hash = hash * 31 + this.id;
hash = hash * 31 + this.f;
if (this.processes != null) {
for (int i = 0; i < this.processes.length; i++) hash = hash * 31 + this.processes[i];
} else {
hash = hash * 31 + 0;
}
hash = hash * 31 + this.addresses.hashCode();
return hash;
}
}
......@@ -24,6 +24,7 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import navigators.smart.communication.SystemMessage;
import navigators.smart.reconfiguration.views.View;
import navigators.smart.tom.util.TOMUtil;
......@@ -35,6 +36,7 @@ import navigators.smart.tom.util.TOMUtil;
public class SMMessage extends SystemMessage implements Externalizable {
private TransferableState state; // State log
private View view;
private int eid; // Execution ID up to which the sender needs to be updated
private int type; // Message type
private int replica; // Replica that should send the state
......@@ -51,10 +53,11 @@ public class SMMessage extends SystemMessage implements Externalizable {
* @param replica Replica that should send the state
* @param state State log
*/
public SMMessage(int sender, int eid, int type, int replica, TransferableState state, int regency, int leader) {
public SMMessage(int sender, int eid, int type, int replica, TransferableState state, View view, int regency, int leader) {
super(sender);
this.state = state;
this.view = view;
this.eid = eid;
this.type = type;
this.replica = replica;
......@@ -77,7 +80,15 @@ public class SMMessage extends SystemMessage implements Externalizable {
public TransferableState getState() {
return state;
}
/**
* Retrieves the state log
* @return The state Log
*/
public View getView() {
return view;
}
/**
* Retrieves the type of the message
* @return The type of the message
......@@ -129,6 +140,7 @@ public class SMMessage extends SystemMessage implements Externalizable {
out.writeInt(regency);
out.writeInt(leader);
out.writeObject(state);
out.writeObject(view);
}
@Override
......@@ -142,5 +154,6 @@ public class SMMessage extends SystemMessage implements Externalizable {
regency = in.readInt();
leader = in.readInt();
state = (TransferableState) in.readObject();
view = (View) in.readObject();
}
}
......@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantLock;
import navigators.smart.paxosatwar.executionmanager.ExecutionManager;
import navigators.smart.paxosatwar.messages.PaxosMessage;
import navigators.smart.reconfiguration.ServerViewManager;
import navigators.smart.reconfiguration.views.View;
import navigators.smart.tom.core.DeliveryThread;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.leaderchange.LCManager;
......@@ -44,6 +45,7 @@ public class StateManager {
private StateLog log;
private HashSet<SenderEid> senderEids = null;
private HashSet<SenderState> senderStates = null;
private HashSet<SenderView> senderViews = null;
private HashSet<SenderRegency> senderRegencies = null;
private HashSet<SenderLeader> senderLeaders = null;
......@@ -78,6 +80,7 @@ public class StateManager {
this.log = new StateLog(k);
senderEids = new HashSet<SenderEid>();
senderStates = new HashSet<SenderState>();
senderViews = new HashSet<SenderView>();
senderRegencies = new HashSet<SenderRegency>();
senderLeaders = new HashSet<SenderLeader>();
......@@ -151,7 +154,9 @@ public class StateManager {
public void addLeader(int sender, int leader) {
senderLeaders.add(new SenderLeader(sender, leader));
}
public void addView(int sender, View view) {
senderViews.add(new SenderView(sender, view));
}
public void emptyRegencies() {
senderRegencies.clear();
}
......@@ -194,6 +199,23 @@ public class StateManager {
return count > SVManager.getQuorum2F();
//******* EDUARDO END **************//
}
public boolean moreThan2F_Views(View view) {
int count = 0;
HashSet<Integer> replicasCounted = new HashSet<Integer>();
for (SenderView m : senderViews) {
if (m.view.equals(view) && !replicasCounted.contains(m.sender)) {
replicasCounted.add(m.sender);
count++;
}
}
//******* EDUARDO BEGIN **************//
return count > SVManager.getQuorum2F();
//******* EDUARDO END **************//
}
public void addState(int sender, TransferableState state) {
senderStates.add(new SenderState(sender, state));
......@@ -348,7 +370,7 @@ public class StateManager {
//stateManager.emptyReplicas(eid);// this causes an exception
SMMessage smsg = new SMMessage(SVManager.getStaticConf().getProcessId(),
getWaiting(), TOMUtil.SM_REQUEST, getReplica(), null, -1, -1);
getWaiting(), TOMUtil.SM_REQUEST, getReplica(), null, null, -1, -1);
tomLayer.getCommunication().send(SVManager.getCurrentViewOtherAcceptors(), smsg);
Logger.println("(TOMLayer.requestState) I just sent a request to the other replicas for the state up to EID " + getWaiting());
......@@ -360,7 +382,7 @@ public class StateManager {
int[] myself = new int[1];
myself[0] = SVManager.getStaticConf().getProcessId();
tomLayer.getCommunication().send(myself, new SMMessage(-1, getWaiting(), TOMUtil.TRIGGER_SM_LOCALLY, -1, null, -1, -1));
tomLayer.getCommunication().send(myself, new SMMessage(-1, getWaiting(), TOMUtil.TRIGGER_SM_LOCALLY, -1, null, null, -1, -1));
......@@ -418,7 +440,7 @@ public class StateManager {
int[] targets = { msg.getSender() };
SMMessage smsg = new SMMessage(SVManager.getStaticConf().getProcessId(),
msg.getEid(), TOMUtil.SM_REPLY, -1, thisState, lcManager.getLastReg(), tomLayer.lm.getCurrentLeader());
msg.getEid(), TOMUtil.SM_REPLY, -1, thisState, SVManager.getCurrentView(), lcManager.getLastReg(), tomLayer.lm.getCurrentLeader());
// malicious code, to force the replica not to send the state
//if (reconfManager.getStaticConf().getProcessId() != 0 || !sendState)
......@@ -444,10 +466,18 @@ public class StateManager {
int currentRegency = -1;
int currentLeader = -1;
View currentView = null;
addRegency(msg.getSender(), msg.getRegency());
addLeader(msg.getSender(), msg.getLeader());
addView(msg.getSender(), msg.getView());
if (moreThan2F_Regencies(msg.getRegency())) currentRegency = msg.getRegency();
if (moreThan2F_Leaders(msg.getLeader())) currentLeader = msg.getLeader();
if (moreThan2F_Views(msg.getView())) {
currentView = msg.getView();
if (currentView.isMember(SVManager.getStaticConf().getProcessId())) {
System.out.println("Not a member anymore!");
}
}
Logger.println("(TOMLayer.SMReplyDeliver) The reply is for the EID that I want!");
......@@ -476,15 +506,16 @@ public class StateManager {
}
}
if (recvState != null && haveState == 1 && currentRegency > -1 && currentLeader > -1) {
if (recvState != null && haveState == 1 && currentRegency > -1 &&
currentLeader > -1 && currentView != null) {
Logger.println("(TOMLayer.SMReplyDeliver) The state of those replies is good!");
lcManager.setLastReg(currentRegency);
lcManager.setNextReg(currentRegency);
tomLayer.lm.setNewReg(currentRegency);
tomLayer.lm.setNewLeader(currentLeader);
Logger.println("(TOMLayer.SMReplyDeliver) The state of those replies is good!");
recvState.setState(getReplicaState());
lockState.lock();
......@@ -516,10 +547,13 @@ public class StateManager {
execManager.restart();
}
tomLayer.processOutOfContext();
if (SVManager.getCurrentViewId() != currentView.getId()) {
System.out.println("Installing current view!");
SVManager.reconfigureTo(currentView);
}
dt.canDeliver();
//ot.OutOfContextUnlock();
......@@ -675,4 +709,32 @@ public class StateManager {
return hash;
}
}
private class SenderView {
private int sender;
private View view;
SenderView(int sender, View view) {
this.sender = sender;
this.view = view;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof SenderView) {
SenderView m = (SenderView) obj;
return (this.view.equals(m.view) && m.sender == this.sender);
}
return false;
}
@Override
public int hashCode() {
int hash = 1;
hash = hash * 31 + this.sender;
hash = hash * 31 + this.view.hashCode();
return hash;
}
}
}
......@@ -214,7 +214,7 @@ public class ServiceReplica implements TOMReceiver {
cs.send(new int[]{tomMsg.getSender()}, tomMsg.reply);
}
public void receiveMessages(int consId, int regency, TOMMessage[] requests) {
public void receiveMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests) {
TOMMessage firstRequest = requests[0];
if(executor instanceof BatchExecutable) {
......@@ -230,7 +230,7 @@ public class ServiceReplica implements TOMReceiver {
List<MessageContext> msgCtxts = new ArrayList<MessageContext>();
for (TOMMessage request : requests) {
if (request.getViewID() == SVManager.getCurrentViewId()) {
if (!fromConsensus || request.getViewID() == SVManager.getCurrentViewId()) {
//If message is a request, put message in the toBatch list
if (request.getReqType() == TOMMessageType.REQUEST) {
......@@ -257,9 +257,9 @@ public class ServiceReplica implements TOMReceiver {
throw new RuntimeException("Should never reach here!");
}
} else {
} else if (fromConsensus && request.getViewID() < SVManager.getCurrentViewId()) {
// message sender had an old view, resend the message to
// him
// him (but only if it came from consensus an not state transfer)
tomLayer.getCommunication().send(
new int[] { request.getSender() },
new TOMMessage(SVManager.getStaticConf()
......@@ -290,14 +290,15 @@ public class ServiceReplica implements TOMReceiver {
byte[][] replies = ((BatchExecutable) executor).executeBatch(batch, msgContexts);
//Send the replies back to the client
for(int index = 0; index < toBatch.size(); index++){
TOMMessage request = toBatch.get(index);
request.reply = new TOMMessage(id,
request.getSession(), request.getSequence(),
if (fromConsensus) {
for(int index = 0; index < toBatch.size(); index++){
TOMMessage request = toBatch.get(index);
request.reply = new TOMMessage(id,
request.getSession(), request.getSequence(),
replies[index], SVManager.getCurrentViewId());
cs.send(new int[] { request.getSender() }, request.reply);
}
cs.send(new int[] { request.getSender() }, request.reply);
}
}
//DEBUG
navigators.smart.tom.util.Logger.println("BATCHEXECUTOR END");
}
......@@ -305,7 +306,8 @@ public class ServiceReplica implements TOMReceiver {
navigators.smart.tom.util.Logger.println("(ServiceReplica.receiveMessages) singe executor for consensus " + consId);
for (TOMMessage request: requests) {
if (request.getViewID() == SVManager.getCurrentViewId()) {
if (!fromConsensus || request.getViewID() == SVManager.getCurrentViewId()) {
navigators.smart.tom.util.Logger.println("(ServiceReplica.receiveMessages) same view");
if (request.getReqType() == TOMMessageType.REQUEST) {
......@@ -321,11 +323,14 @@ public class ServiceReplica implements TOMReceiver {
navigators.smart.tom.util.Logger.println("(ServiceReplica.receiveMessages) executing message " + request.getSequence() + " from " + request.getSender() + " decided in consensus " + consId);
response = ((SingleExecutable)executor).executeOrdered(request.getContent(), msgCtx);
// build the reply and send it to the client
request.reply = new TOMMessage(id, request.getSession(),
request.getSequence(), response, SVManager.getCurrentViewId());
navigators.smart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to " + request.getSender());
cs.send(new int[]{request.getSender()}, request.reply);
if (fromConsensus) {
request.reply = new TOMMessage(id, request.getSession(),
request.getSequence(), response, SVManager.getCurrentViewId());
navigators.smart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to " + request.getSender());
cs.send(new int[]{request.getSender()}, request.reply);
}
} else if (request.getReqType() == TOMMessageType.RECONFIG) {
//Reconfiguration request to be processed after the batch
navigators.smart.tom.util.Logger.println("(ServiceReplica.receiveMessages) Enqueing an update");
......@@ -333,7 +338,7 @@ public class ServiceReplica implements TOMReceiver {
} else {
throw new RuntimeException("Should never reach here!");
}
} else {
} else if (fromConsensus && request.getViewID() < SVManager.getCurrentViewId()) {
navigators.smart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending current view to " + request.getSender());
//message sender had an old view, resend the message to him
......@@ -341,7 +346,10 @@ public class ServiceReplica implements TOMReceiver {
new TOMMessage(SVManager.getStaticConf().getProcessId(),
request.getSession(), request.getSequence(),
TOMUtil.getBytes(SVManager.getCurrentView()), SVManager.getCurrentViewId()));
}
} else {
System.out.println("WTF no consenso numero " + consId);
}
}
}
}
......
......@@ -44,7 +44,7 @@ public interface TOMReceiver {
* @param regency
* @param requests The batch with TOMMessage objects.
*/
public void receiveMessages(int consId, int regency, TOMMessage[] requests);
public void receiveMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests);
/**
......
......@@ -139,7 +139,7 @@ public final class DeliveryThread extends Thread {
tomLayer.clientsManager.requestsOrdered(requests);
deliverMessages(eid, tomLayer.getLCManager().getLastReg(), requests);
deliverMessages(eid, tomLayer.getLCManager().getLastReg(), false, requests);
//******* EDUARDO BEGIN **************//
if (manager.hasUpdates()) {
......@@ -215,7 +215,7 @@ public final class DeliveryThread extends Thread {
//clean the ordered messages from the pending buffer
tomLayer.clientsManager.requestsOrdered(requests);
deliverMessages(cons.getId(), tomLayer.getLCManager().getLastReg(), requests);
deliverMessages(cons.getId(), tomLayer.getLCManager().getLastReg(), true, requests);
//******* EDUARDO BEGIN **************//
if (manager.hasUpdates()) {
......@@ -277,8 +277,8 @@ public final class DeliveryThread extends Thread {
receiver.receiveReadonlyMessage(request, msgCtx);
}
private void deliverMessages(int consId, int regency, TOMMessage[] requests) {
receiver.receiveMessages(consId, regency, requests);
private void deliverMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests) {
receiver.receiveMessages(consId, regency, fromConsensus, requests);
}
private void processReconfigMessages(int consId, int decisionRoundNumber) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册