提交 877af08a 编写于 作者: L liquidsnake@sapo.pt

I have re-done some early work on the StateLog and TransferableState class

上级 6772744f
......@@ -29,11 +29,12 @@ package navigators.smart.statemanagment;
public class StateLog {
private byte[][] messageBatches; // batches received since the last checkpoint.
private int lastEid; // Execution ID for the last checkpoint
private int lastCheckpointEid; // Execution ID for the last checkpoint
private byte[] state; // State associated with the last checkpoint
private int k; // checkpoint period
private int position; // next position in the array of batches to be written
private int execCounter; // number of executions so far
//private int execCounter; // number of executions so far
private int lastEid; // Execution ID for the last messages batch delivered to the application
/**
* Constructs a State log
......@@ -43,10 +44,11 @@ public class StateLog {
this.k = k;
this.messageBatches = new byte[k][];
this.lastEid = 0;
this.lastCheckpointEid = -1;
this.state = null;
this.position = 0;
this.execCounter = 0;
//this.execCounter = 0;
this.lastEid = -1;
}
/**
......@@ -59,8 +61,8 @@ public class StateLog {
messageBatches[i] = null;
position = 0;
lastEid += k;
execCounter++;
//lastCheckpointEid += k;
//execCounter++;
this.state = state;
/************************* TESTE *************************
System.out.println("###################################");
......@@ -80,12 +82,39 @@ public class StateLog {
}
/**
* Retrieves the execution ID for the last checkpoint
* @return Execution ID for the last checkpoint, or -1 if no checkpoint was yet executed
* Retrieves the execution ID for the last messages batch delivered to the application
* @return Execution ID for the last messages batch delivered to the application, or -1 if none was delivered
*/
public int getCurrentCheckpointEid() {
public void setLastCheckpointEid(int lastCheckpointEid) {
this.lastCheckpointEid = lastCheckpointEid;
}
/**
* Retrieves the execution ID for the last messages batch delivered to the application
* @return Execution ID for the last messages batch delivered to the application, or -1 if none was delivered
*/
public int getLastCheckpointEid() {
return lastEid - 1;
return lastCheckpointEid ;
}
/**
* Sets the execution ID for the last messages batch delivered to the application
* @param lastEid the execution ID for the last messages batch delivered to the application
*/
public void setLastEid(int lastEid) {
this.lastEid = lastEid;
}
/**
* Retrieves the execution ID for the last messages batch delivered to the application
* @return Execution ID for the last messages batch delivered to the application
*/
public int getLastEid() {
return lastEid;
}
/**
......@@ -109,11 +138,11 @@ public class StateLog {
messageBatches[position] = batch;
position++;
execCounter++;
//execCounter++;
/************************* TESTE *************************/
/************************* TESTE *************************
System.out.println("posicao: " + position);
System.out.println("execucoes: " + execCounter);
System.out.println("execucoes: " + lastEid);
/************************* TESTE *************************/
return true;
......@@ -128,8 +157,8 @@ public class StateLog {
* @return The batch of messages associated with the batch correspondent execution ID
*/
public byte[] getMessageBatch(int eid) {
if (eid >= lastEid && eid <= execCounter) {
return messageBatches[eid - lastEid];
if (eid >= lastCheckpointEid && eid <= lastEid) {
return messageBatches[eid - lastCheckpointEid];
}
else return null;
}
......@@ -155,14 +184,14 @@ public class StateLog {
//System.out.println("Ultimo checkpoint: " + lastEid);
//System.exit(0);
if (eid >= lastEid && eid <= execCounter) {
if (eid >= lastCheckpointEid && eid <= lastEid) {
byte[][] batches = new byte[eid - lastEid + 1][];
byte[][] batches = new byte[eid - lastCheckpointEid + 1][];
for (int i = 0; i < (eid - lastEid + 1); i++)
for (int i = 0; i < (eid - lastCheckpointEid + 1); i++)
batches[i] = messageBatches[i];
return new TransferableState(batches, lastEid, state);
return new TransferableState(batches, lastCheckpointEid, lastEid, state);
}
else return null;
......@@ -178,10 +207,10 @@ public class StateLog {
this.messageBatches[i] = transState.getMessageBatches()[i];
}
this.lastEid = transState.getCurrentCheckpointEid() + 1;
this.lastCheckpointEid = transState.getCurrentCheckpointEid() + 1;
this.state = transState.getState();
this.execCounter = this.lastEid + position;
this.lastEid = this.lastCheckpointEid + position;
}
}
......@@ -31,8 +31,9 @@ import java.util.Arrays;
public class TransferableState implements Serializable {
private byte[][] messageBatches; // batches received since the last checkpoint.
private int nextEid; // Execution ID for the last checkpoint
private int lastCheckpointEid; // Execution ID for the last checkpoint
private byte[] state; // State associated with the last checkpoint
private int lastEid = -1; // Execution ID for the last messages batch delivered to the application
/**
* Constructs a TansferableState
......@@ -40,17 +41,18 @@ public class TransferableState implements Serializable {
* @param nextEid Execution ID for the last checkpoint
* @param state State associated with the last checkpoint
*/
public TransferableState(byte[][] messageBatches, int nextEid, byte[] state) {
public TransferableState(byte[][] messageBatches, int lastCheckpointEid, int lastEid, byte[] state) {
this.messageBatches = messageBatches; // batches received since the last checkpoint.
this.nextEid = nextEid; // Execution ID for the last checkpoint
this.lastCheckpointEid = lastCheckpointEid; // Execution ID for the last checkpoint
this.lastEid = lastEid; // Execution ID for the last messages batch delivered to the application
this.state = state; // State associated with the last checkpoint
}
public TransferableState() {
this.messageBatches = null; // batches received since the last checkpoint.
this.nextEid = 0; // Execution ID for the last checkpoint
this.lastCheckpointEid = 0; // Execution ID for the last checkpoint
this.state = null; // State associated with the last checkpoint
}
/**
......@@ -67,8 +69,8 @@ public class TransferableState implements Serializable {
* @return The batch of messages associated with the batch correspondent execution ID
*/
public byte[] getMessageBatch(int eid) {
if (eid >= nextEid && eid < (nextEid + messageBatches.length)) {
return messageBatches[eid - nextEid];
if (eid >= lastCheckpointEid && eid < (lastCheckpointEid + messageBatches.length)) {
return messageBatches[eid - lastCheckpointEid];
}
else return null;
}
......@@ -79,7 +81,16 @@ public class TransferableState implements Serializable {
*/
public int getCurrentCheckpointEid() {
return nextEid - 1;
return lastCheckpointEid;
}
/**
* Retrieves the execution ID for the last messages batch delivered to the application
* @return Execution ID for the last messages batch delivered to the application
*/
public int getLastEid() {
return lastEid;
}
/**
......@@ -106,7 +117,8 @@ public class TransferableState implements Serializable {
for (int i = 0; i < this.messageBatches.length; i++)
if (!Arrays.equals(this.messageBatches[i], tState.messageBatches[i])) return false;
return (Arrays.equals(this.state, tState.state) && tState.nextEid == this.nextEid);
return (Arrays.equals(this.state, tState.state) &&
tState.lastCheckpointEid == this.lastCheckpointEid && tState.lastEid == this.lastEid);
}
return false;
}
......@@ -114,7 +126,8 @@ public class TransferableState implements Serializable {
@Override
public int hashCode() {
int hash = 1;
hash = hash * 31 + this.nextEid;
hash = hash * 31 + this.lastCheckpointEid;
hash = hash * 31 + this.lastEid;
if (this.state != null)
for (int i = 0; i < this.state.length; i++) hash = hash * 31 + (int) this.state[i];
else hash = hash * 31 + 0;
......
......@@ -179,12 +179,12 @@ public class DeliveryThread extends Thread {
if ((cons.getId() > 0) && (cons.getId() % conf.getCheckpoint_period() == 0)) {
Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
byte[] state = receiver.getState();
tomLayer.saveState(state);
tomLayer.saveState(state, cons.getId());
//TODO: possivelmente fazer mais alguma coisa
}
else {
Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
tomLayer.saveBatch(cons.getDecision());
tomLayer.saveBatch(cons.getDecision(), cons.getId());
//TODO: possivelmente fazer mais alguma coisa
}
}
......
......@@ -899,8 +899,10 @@ public final class TOMLayer extends Thread implements RequestReceiver {
/** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
private StateManager stateManager = null;
public void saveState(byte[] state) {
public void saveState(byte[] state, int lastEid) {
stateManager.getLog().newCheckpoint(state);
stateManager.getLog().setLastEid(lastEid);
stateManager.getLog().setLastCheckpointEid(lastEid);
/************************* TESTE *************************
int value = 0;
for (int i = 0; i < 4; i++) {
......@@ -913,8 +915,9 @@ public final class TOMLayer extends Thread implements RequestReceiver {
System.out.println("//////////////////////////////////////////////////");
/************************* TESTE *************************/
}
public void saveBatch(byte[] batch) {
public void saveBatch(byte[] batch, int lastEid) {
stateManager.getLog().addMessageBatch(batch);
stateManager.getLog().setLastEid(lastEid);
}
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册