提交 20f413da 编写于 作者: L liquidsnake@sapo.pt

Fixed the concurrent exception related to the code which implements checkpoints

上级 a297cf5a
......@@ -31,12 +31,14 @@ import java.io.ObjectOutputStream;
import navigators.smart.paxosatwar.messages.PaxosMessage;
import navigators.smart.paxosatwar.roles.Acceptor;
import navigators.smart.paxosatwar.roles.Proposer;
import navigators.smart.statemanagment.SMMessage;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.core.timer.messages.ForwardedMessage;
import navigators.smart.tom.core.timer.messages.RTMessage;
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;
/**
......@@ -79,6 +81,18 @@ public class MessageHandler {
TOMMessage request = ((ForwardedMessage) sm).getRequest();
Logger.println("(MessageHandler.processData) receiving: " + request);
tomLayer.requestReceived(request);
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
} else if (sm instanceof SMMessage) {
SMMessage smsg = (SMMessage) sm;
if (smsg.getType() == TOMUtil.SM_REQUEST) {
tomLayer.SMRequestDeliver(smsg);
}
else {
tomLayer.SMReplyDeliver(smsg);
}
/******************************************************************/
}
}
......
......@@ -31,8 +31,10 @@ import navigators.smart.paxosatwar.messages.MessageFactory;
import navigators.smart.paxosatwar.messages.PaxosMessage;
import navigators.smart.paxosatwar.roles.Acceptor;
import navigators.smart.paxosatwar.roles.Proposer;
import navigators.smart.statemanagment.SMMessage;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;
/**
......@@ -232,6 +234,10 @@ public final class ExecutionManager {
System.out.println("- Last consensus executed: " + lastConsId);
System.out.println("##################################################################################");
//TODO: at this point a new state should be recovered from other correct replicas
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
tomLayer.requestState(me, otherAcceptors, msg.getSender(), consId);
/******************************************************************/
}
outOfContextLock.unlock();
......
......@@ -28,23 +28,26 @@ import navigators.smart.tom.core.messages.SystemMessage;
/**
* This classe represents a message used in the state transfer protocol
*
* @author João Sousa
* @author Jo�o Sousa
*/
public class SMMessage extends SystemMessage {
private TransferableState state; // State log
private int eid; // Execution ID up to which the sender needs to be updated
private int type; // Message type
/**
* Constructs a SMMessage
* @param sender Process Id of the sender
* @param eid Execution ID up to which the sender needs to be updated
* @param type Message type
* @param state State log
*/
public SMMessage(int sender, int type, TransferableState state) {
public SMMessage(int sender, int eid, int type, TransferableState state) {
super(sender);
this.state = state;
this.eid = eid;
this.type = type;
this.sender = sender;
......@@ -66,6 +69,14 @@ public class SMMessage extends SystemMessage {
return type;
}
/**
* Retrieves the execution ID up to which the sender needs to be updated
* @return The execution ID up to which the sender needs to be updated
*/
public int getEid() {
return eid;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException{
super.writeExternal(out);
......
......@@ -18,7 +18,8 @@
package navigators.smart.statemanagment;
import navigators.smart.tom.ServiceReplica;
import java.util.HashSet;
import java.util.Hashtable;
/**
* TODO: Não sei se esta classe sera usada. Para já, deixo ficar
......@@ -27,22 +28,69 @@ import navigators.smart.tom.ServiceReplica;
*/
public class StateManager {
public static final int K = 1000;
private ServiceReplica replica;
private StateLog log;
private HashSet<Message> messages = null;
private int f;
public StateManager(int k, int f) {
this.log = new StateLog(k);
messages = new HashSet<Message>();
this.f = f;
}
public void addReplica(int sender, int eid) {
messages.add(new Message(sender, eid));
}
public void emptyReplicas() {
messages.clear();
}
public boolean moreThenF(int eid) {
public StateManager(ServiceReplica replica) {
int count = 0;
HashSet<Integer> replicasCounted = new HashSet<Integer>();
this.replica = replica;
this.log = new StateLog(K);
for (Message m : messages) {
if (m.eid == eid && !replicasCounted.contains(m.sender)) {
replicasCounted.add(m.sender);
count++;
}
}
return count > f;
}
public void makeCheckpoint() {
public StateLog getLog() {
return log;
}
public void teste() {
// So para ver se isto funciona
private class Message {
private int sender;
private int eid;
Message(int sender, int eid) {
this.sender = sender;
this.eid = eid;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Message) {
Message m = (Message) obj;
return (m.eid == this.eid && m.sender == this.sender);
}
return false;
}
@Override
public int hashCode() {
int hash = 1;
hash = hash * 31 + this.sender;
hash = hash * 31 + this.eid;
return hash;
}
}
}
......@@ -44,7 +44,10 @@ import navigators.smart.paxosatwar.executionmanager.ExecutionManager;
import navigators.smart.paxosatwar.executionmanager.LeaderModule;
import navigators.smart.paxosatwar.executionmanager.Round;
import navigators.smart.paxosatwar.roles.Acceptor;
import navigators.smart.statemanagment.SMMessage;
import navigators.smart.statemanagment.StateLog;
import navigators.smart.statemanagment.StateManager;
import navigators.smart.statemanagment.TransferableState;
import navigators.smart.tom.TOMRequestReceiver;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.core.timer.RTInfo;
......@@ -159,8 +162,8 @@ public final class TOMLayer extends Thread implements RequestReceiver {
this.dt = new DeliveryThread(this, receiver, conf); // Create delivery thread
this.dt.start();
/** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
stateLog = new StateLog(this.conf.getCheckpoint_period());
/** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS E TRANSFERENCIA DE ESTADO*/
stateManager = new StateManager(this.conf.getCheckpoint_period(), this.conf.getF());
/*******************************************************/
}
......@@ -893,13 +896,40 @@ public final class TOMLayer extends Thread implements RequestReceiver {
}
/** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
private StateLog stateLog = null;
private StateManager stateManager = null;
public void saveState(byte[] state) {
stateLog.newCheckpoint(state);
stateManager.getLog().newCheckpoint(state);
}
public void saveBatch(byte[] batch) {
stateLog.addMessageBatch(batch);
stateManager.getLog().addMessageBatch(batch);
}
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
public void requestState(int me, int[] otherAcceptors, int sender, int eid) {
stateManager.addReplica(sender, eid);
if (stateManager.moreThenF(eid)) {
SMMessage smsg = new SMMessage(me, eid, TOMUtil.SM_REQUEST, null);
communication.send(otherAcceptors, smsg);
}
}
public TransferableState getTransferableState(int eid) {
return stateManager.getLog().getTransferableState(eid);
}
public void SMRequestDeliver(SMMessage msg) {
TransferableState state = getTransferableState(msg.getEid());
if (state != null) {
int[] targets = { msg.getSender() };
SMMessage smsg = new SMMessage(execManager.getProcessId(), -1, TOMUtil.SM_REPLY, state);
communication.send(targets, smsg);
}
}
public void SMReplyDeliver(SMMessage msg) {
}
/********************************************************/
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册