提交 36f0d02a 编写于 作者: L liquidsnake@sapo.pt

More work done related to the state transfer. The replica which requested for...

More work done related to the state transfer. The replica which requested for the state is starting to process it
上级 f25b8e3a
......@@ -29,7 +29,7 @@ package navigators.smart.statemanagment;
public class StateLog {
private byte[][] messageBatches; // batches received since the last checkpoint.
private int nextEid; // Execution ID for the last checkpoint
private int lastEid; // Execution ID for the last checkpoint
private byte[] state; // State associated with the last checkpoint
private int k; // checkpoint period
private int position; // next position in the array of batches to be written
......@@ -43,7 +43,7 @@ public class StateLog {
this.k = k;
this.messageBatches = new byte[k][];
this.nextEid = 0;
this.lastEid = 0;
this.state = null;
this.position = 0;
this.execCounter = 0;
......@@ -59,7 +59,7 @@ public class StateLog {
messageBatches[i] = null;
position = 0;
nextEid += k;
lastEid += k;
execCounter++;
this.state = state;
}
......@@ -70,7 +70,7 @@ public class StateLog {
*/
public int getCurrentCheckpointEid() {
return nextEid - 1;
return lastEid - 1;
}
/**
......@@ -94,8 +94,10 @@ public class StateLog {
messageBatches[position] = batch;
//System.out.println("posicao: " + position);
//System.out.println("execucoes: " + execCounter);
/************************* TESTE *************************
System.out.println("posicao: " + position);
System.out.println("execucoes: " + execCounter);
/************************* TESTE *************************/
position++;
execCounter++;
......@@ -112,8 +114,8 @@ public class StateLog {
* @return The batch of messages associated with the batch correspondent execution ID
*/
public byte[] getMessageBatch(int eid) {
if (eid >= nextEid && eid <= execCounter) {
return messageBatches[eid - nextEid];
if (eid >= lastEid && eid <= execCounter) {
return messageBatches[eid - lastEid];
}
else return null;
}
......@@ -133,14 +135,20 @@ public class StateLog {
*/
public TransferableState getTransferableState(int eid) {
if (eid >= nextEid && eid <= execCounter) {
//System.out.println("A devolver o estado!");
//System.out.println("EID pedido: " + eid);
//System.out.println("Execucoes feitas ate agora: " + execCounter);
//System.out.println("Ultimo checkpoint: " + lastEid);
//System.exit(0);
byte[][] batches = new byte[eid - nextEid + 1][];
if (eid >= lastEid && eid <= execCounter) {
for (int i = 0; i < eid - nextEid; i++)
byte[][] batches = new byte[eid - lastEid + 1][];
for (int i = 0; i < eid - lastEid; i++)
batches[i] = messageBatches[i];
return new TransferableState(batches, nextEid, state);
return new TransferableState(batches, lastEid, state);
}
else return null;
......@@ -156,10 +164,10 @@ public class StateLog {
this.messageBatches[i] = transState.getMessageBatches()[i];
}
this.nextEid = transState.getCurrentCheckpointEid() + 1;
this.lastEid = transState.getCurrentCheckpointEid() + 1;
this.state = transState.getState();
this.execCounter = this.nextEid + position;
this.execCounter = this.lastEid + position;
}
}
......@@ -18,6 +18,7 @@
package navigators.smart.statemanagment;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Hashtable;
......@@ -29,31 +30,50 @@ import java.util.Hashtable;
public class StateManager {
private StateLog log;
private HashSet<Message> messages = null;
private HashSet<SenderEid> senderEids = null;
private HashSet<SenderState> senderStates = null;
private int f;
private int lastEid;
private boolean wait;
public StateManager(int k, int f) {
this.log = new StateLog(k);
messages = new HashSet<Message>();
senderEids = new HashSet<SenderEid>();
senderStates = new HashSet<SenderState>();
this.f = f;
this.lastEid = -1;
this.wait = false;
}
public void addReplica(int sender, int eid) {
messages.add(new Message(sender, eid));
public void addEID(int sender, int eid) {
senderEids.add(new SenderEid(sender, eid));
}
public void emptyReplicas() {
messages.clear();
public void emptyEIDs() {
senderEids.clear();
}
public void emptyReplicas(int eid) {
for (Message m : messages)
if (m.eid <= eid) messages.remove(m);
public void emptyEIDs(int eid) {
for (SenderEid m : senderEids)
if (m.eid <= eid) senderEids.remove(m);
}
public void addState(int sender, TransferableState state) {
senderStates.add(new SenderState(sender, state));
}
public void emptyStates() {
senderStates.clear();
}
public boolean isWaiting() {
return wait;
}
public void setWaiting(boolean wait) {
this.wait = wait;
}
public void setLastEID(int eid) {
lastEid = eid;
}
......@@ -62,12 +82,12 @@ public class StateManager {
return lastEid;
}
public boolean moreThenF(int eid) {
public boolean moreThenF_EIDs(int eid) {
int count = 0;
HashSet<Integer> replicasCounted = new HashSet<Integer>();
for (Message m : messages) {
for (SenderEid m : senderEids) {
if (m.eid == eid && !replicasCounted.contains(m.sender)) {
replicasCounted.add(m.sender);
count++;
......@@ -76,25 +96,39 @@ public class StateManager {
return count > f;
}
public boolean moreThenF_States(TransferableState state) {
int count = 0;
HashSet<Integer> replicasCounted = new HashSet<Integer>();
for (SenderState m : senderStates) {
if (m.state.equals(state) && !replicasCounted.contains(m.sender)) {
replicasCounted.add(m.sender);
count++;
}
}
return count > f;
}
public StateLog getLog() {
return log;
}
private class Message {
private class SenderEid {
private int sender;
private int eid;
Message(int sender, int eid) {
SenderEid(int sender, int eid) {
this.sender = sender;
this.eid = eid;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Message) {
Message m = (Message) obj;
if (obj instanceof SenderEid) {
SenderEid m = (SenderEid) obj;
return (m.eid == this.eid && m.sender == this.sender);
}
return false;
......@@ -108,4 +142,32 @@ public class StateManager {
return hash;
}
}
private class SenderState {
private int sender;
private TransferableState state;
SenderState(int sender, TransferableState state) {
this.sender = sender;
this.state = state;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof SenderState) {
SenderState m = (SenderState) obj;
return (this.state.equals(m.state) && m.sender == this.sender);
}
return false;
}
@Override
public int hashCode() {
int hash = 1;
hash = hash * 31 + this.sender;
hash = hash * 31 + this.state.hashCode();
return hash;
}
}
}
......@@ -19,6 +19,7 @@
package navigators.smart.statemanagment;
import java.io.Serializable;
import java.util.Arrays;
/**
* This classe represents a state tranfered from a replica to another. The state associated with the last
......@@ -48,7 +49,9 @@ public class TransferableState implements Serializable {
}
public TransferableState() {
this.messageBatches = null; // batches received since the last checkpoint.
this.nextEid = 0; // Execution ID for the last checkpoint
this.state = null; // State associated with the last checkpoint
}
/**
* Retrieves all batches of messages
......@@ -86,4 +89,27 @@ public class TransferableState implements Serializable {
public byte[] getState() {
return state;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TransferableState) {
TransferableState tState = (TransferableState) obj;
if (this.messageBatches.length != tState.messageBatches.length) return false;
for (int i = 0; i < this.messageBatches.length; i++)
if (!Arrays.equals(this.messageBatches[i], tState.messageBatches[i])) return false;
return (Arrays.equals(this.state, tState.state) && tState.nextEid == this.nextEid);
}
return false;
}
@Override
public int hashCode() {
int hash = 1;
hash = hash * 31 + this.nextEid;
for (int i = 0; i < this.state.length; i++) hash = hash * 31 + (int) this.state[i];
for (int i = 0; i < this.messageBatches.length; i++)
for (int j = 0; j < this.messageBatches[i].length; j++)
hash = hash * 31 + (int) this.messageBatches[i][j];
return hash;
}
}
......@@ -919,13 +919,25 @@ public final class TOMLayer extends Thread implements RequestReceiver {
public void requestState(int me, int[] otherAcceptors, int sender, int eid) {
stateManager.addReplica(sender, eid);
if (stateManager.moreThenF(eid) && stateManager.getLastEID() < eid) {
if (!stateManager.isWaiting()) {
stateManager.addEID(sender, eid);
if (stateManager.moreThenF_EIDs(eid) && stateManager.getLastEID() < eid) {
stateManager.setLastEID(eid);
//stateManager.emptyReplicas(eid);// isto causa uma excepcao
SMMessage smsg = new SMMessage(me, eid, TOMUtil.SM_REQUEST, null);
communication.send(otherAcceptors, smsg);
stateManager.setLastEID(eid);
stateManager.setWaiting(true);
//stateManager.emptyReplicas(eid);// isto causa uma excepcao
SMMessage smsg = new SMMessage(me, eid, TOMUtil.SM_REQUEST, null);
communication.send(otherAcceptors, smsg);
/************************* TESTE *************************
System.out.println("Enviei um pedido!");
System.out.println("Quem envia: " + smsg.getSender());
System.out.println("Que tipo: " + smsg.getType());
System.out.println("Que EID: " + smsg.getEid());
/************************* TESTE *************************/
}
}
}
......@@ -935,26 +947,22 @@ public final class TOMLayer extends Thread implements RequestReceiver {
public void SMRequestDeliver(SMMessage msg) {
TransferableState state = getTransferableState(msg.getEid());
if (state == null) state = new TransferableState();
int[] targets = { msg.getSender() };
SMMessage smsg = new SMMessage(execManager.getProcessId(), msg.getEid(), TOMUtil.SM_REPLY, state);
communication.send(targets, smsg);
/************************* TESTE *************************
System.out.println("Recebi um pedido de estado!");
System.out.println("Quem enviou: " + msg.getSender());
System.out.println("Que tipo: " + msg.getType());
System.out.println("Que EID: " + msg.getEid());
System.exit(0);
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println("Quem envia: " + smsg.getSender());
System.out.println("Que tipo: " + smsg.getType());
System.out.println("Que EID: " + smsg.getEid());
//System.exit(0);
/************************* TESTE *************************/
TransferableState state = getTransferableState(msg.getEid());
if (state != null) {
int[] targets = { msg.getSender() };
SMMessage smsg = new SMMessage(execManager.getProcessId(), msg.getEid(), TOMUtil.SM_REPLY, state);
communication.send(targets, smsg);
}
}
public void SMReplyDeliver(SMMessage msg) {
......@@ -964,7 +972,8 @@ public final class TOMLayer extends Thread implements RequestReceiver {
System.out.println("Recebi a minha resposta!");
System.out.println("Quem enviou: " + msg.getSender());
System.out.println("Que tipo: " + msg.getType());
System.out.println("Que EID: " + msg.getEid());
System.out.println("Que EID pedido: " + msg.getEid());
System.out.println("Que EID do estado: " + msg.getState().getCurrentCheckpointEid());
System.exit(0);
try {
Thread.sleep(10000);
......@@ -972,6 +981,24 @@ public final class TOMLayer extends Thread implements RequestReceiver {
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
}
/************************* TESTE *************************/
if (stateManager.isWaiting() && msg.getEid() == stateManager.getLastEID()) {
stateManager.addState(msg.getSender(),msg.getState());
if (stateManager.moreThenF_States(msg.getState())) {
if (msg.getState().getCurrentCheckpointEid() > -1) {
stateManager.getLog().update(msg.getState());
}
else {
}
stateManager.setLastEID(-1);
stateManager.setWaiting(false);
}
}
}
/********************************************************/
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册