提交 189bd8f2 编写于 作者: L liquidsnake@sapo.pt

Started changing the state transfer protocol to not perform checkpointing....

Started changing the state transfer protocol to not perform checkpointing. Such procedure should be delegated to the applcation instead.

THIS IS AN UNSTABLE COMMIT!!!!!! USE AT OWN RISK!!!!!!!!!!!!!!!!!!
上级 4e1b9021
/**
* Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
*
* This file is part of SMaRt.
*
* SMaRt is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* SMaRt is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.statemanagment;
import java.io.Serializable;
import java.util.Arrays;
/**
* This classe represents a state tranfered from a replica to another. The state associated with the last
* checkpoint together with all the batches of messages received do far, comprises the sender's
* current state
*
* IMPORTANT: The hash state MUST ALWAYS be present, regardless if the replica is supposed to
* send the complete state or not
*
* @author Jo�o Sousa
*/
public interface ApplicationState extends Serializable {
/**
* The consensus of the last batch of commands which the application was given
* @return consensus of the last batch of commands which the application was given
*/
public int getLastEid();
/**
* Indicates if the sender replica had the state requested by the recovering replica
* @return true if the sender replica had the state requested by the recovering replica, false otherwise
*/
public boolean hasState();
/**
* Sets a byte array that must be a representation of the application state
* @param state a byte array that must be a representation of the application state
*/
public void setSerializedState(byte[] state);
/**
* Byte array that must be a representation of the application state
* @returns A byte array that must be a representation of the application state
*/
public byte[] getSerializedState();
/**
* Gets an secure hash of the application state
* @return Secure hash of the application state
*/
public byte[] getStateHash();
/**
* This method MUST be implemented. However, the attribute returned by getSerializedState()
* should be ignored, and getStateHash() should be used instead
*/
public abstract boolean equals(Object obj);
/**
* This method MUST be implemented. However, the attribute returned by getSerializedState()
* should be ignored, and getStateHash() should be used instead
*/
public abstract int hashCode();
}
......@@ -35,7 +35,7 @@ import navigators.smart.tom.util.TOMUtil;
*/
public class SMMessage extends SystemMessage implements Externalizable {
private TransferableState state; // State log
private ApplicationState state; // State log
private View view;
private int eid; // Execution ID up to which the sender needs to be updated
private int type; // Message type
......@@ -53,7 +53,7 @@ public class SMMessage extends SystemMessage implements Externalizable {
* @param replica Replica that should send the state
* @param state State log
*/
public SMMessage(int sender, int eid, int type, int replica, TransferableState state, View view, int regency, int leader) {
public SMMessage(int sender, int eid, int type, int replica, ApplicationState state, View view, int regency, int leader) {
super(sender);
this.state = state;
......@@ -77,7 +77,7 @@ public class SMMessage extends SystemMessage implements Externalizable {
* Retrieves the state log
* @return The state Log
*/
public TransferableState getState() {
public ApplicationState getState() {
return state;
}
......@@ -153,7 +153,7 @@ public class SMMessage extends SystemMessage implements Externalizable {
replica = in.readInt();
regency = in.readInt();
leader = in.readInt();
state = (TransferableState) in.readObject();
state = (ApplicationState) in.readObject();
view = (View) in.readObject();
}
}
......@@ -31,6 +31,7 @@ import navigators.smart.reconfiguration.views.View;
import navigators.smart.tom.core.DeliveryThread;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.leaderchange.LCManager;
import navigators.smart.tom.server.Recoverable;
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;
......@@ -42,7 +43,6 @@ import navigators.smart.tom.util.TOMUtil;
*/
public class StateManager {
private StateLog log;
private HashSet<SenderEid> senderEids = null;
private HashSet<SenderState> senderStates = null;
private HashSet<SenderView> senderViews = null;
......@@ -57,7 +57,7 @@ public class StateManager {
private int lastEid;
private int waitingEid;
private int replica;
private byte[] state;
private ApplicationState state;
private ServerViewManager SVManager;
private TOMLayer tomLayer;
......@@ -77,7 +77,6 @@ public class StateManager {
this.lcManager = lcManager;
this.execManager = execManager;
this.log = new StateLog(k);
senderEids = new HashSet<SenderEid>();
senderStates = new HashSet<SenderState>();
senderViews = new HashSet<SenderView>();
......@@ -109,11 +108,11 @@ public class StateManager {
} while (replica == SVManager.getStaticConf().getProcessId());
}
public void setReplicaState(byte[] state) {
public void setReplicaState(ApplicationState state) {
this.state = state;
}
public byte[] getReplicaState() {
public ApplicationState getReplicaState() {
return state;
}
......@@ -217,7 +216,7 @@ public class StateManager {
//******* EDUARDO END **************//
}
public void addState(int sender, TransferableState state) {
public void addState(int sender, ApplicationState state) {
senderStates.add(new SenderState(sender, state));
}
......@@ -258,7 +257,7 @@ public class StateManager {
//******* EDUARDO END **************//
}
private TransferableState getValidHash() {
private ApplicationState getValidHash() {
SenderState[] st = new SenderState[senderStates.size()];
senderStates.toArray(st);
......@@ -300,45 +299,6 @@ public class StateManager {
return senderStates.size();
}
public StateLog getLog() {
return log;
}
public void saveState(byte[] state, int lastEid, int decisionRound, int leader) {
StateLog thisLog = getLog();
lockState.lock();
Logger.println("(TOMLayer.saveState) Saving state of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
thisLog.newCheckpoint(state, tomLayer.computeHash(state));
thisLog.setLastEid(-1);
thisLog.setLastCheckpointEid(lastEid);
thisLog.setLastCheckpointRound(decisionRound);
thisLog.setLastCheckpointLeader(leader);
lockState.unlock();
Logger.println("(TOMLayer.saveState) Finished saving state of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
}
public void saveBatch(byte[] batch, int lastEid, int decisionRound, int leader) {
StateLog thisLog = getLog();
lockState.lock();
Logger.println("(TOMLayer.saveBatch) Saving batch of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
thisLog.addMessageBatch(batch, decisionRound, leader);
thisLog.setLastEid(lastEid);
lockState.unlock();
Logger.println("(TOMLayer.saveBatch) Finished saving batch of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
}
public void analyzeState(int sender, int eid) {
Logger.println("(TOMLayer.analyzeState) The state transfer protocol is enabled");
......@@ -415,27 +375,29 @@ public class StateManager {
public void SMRequestDeliver(SMMessage msg) {
System.out.println("(TOMLayer.SMRequestDeliver) invoked method");
//******* EDUARDO BEGIN **************//
if (SVManager.getStaticConf().isStateTransferEnabled()) {
if (SVManager.getStaticConf().isStateTransferEnabled() && dt.getRecoverer() != null) {
//******* EDUARDO END **************//
Logger.println("(TOMLayer.SMRequestDeliver) The state transfer protocol is enabled");
System.out.println("(TOMLayer.SMRequestDeliver) The state transfer protocol is enabled");
lockState.lock();
Logger.println("(TOMLayer.SMRequestDeliver) I received a state request for EID " + msg.getEid() + " from replica " + msg.getSender());
System.out.println("(TOMLayer.SMRequestDeliver) I received a state request for EID " + msg.getEid() + " from replica " + msg.getSender());
boolean sendState = msg.getReplica() == SVManager.getStaticConf().getProcessId();
if (sendState) Logger.println("(TOMLayer.SMRequestDeliver) I should be the one sending the state");
TransferableState thisState = getLog().getTransferableState(msg.getEid(), sendState);
if (sendState) System.out.println("(TOMLayer.SMRequestDeliver) I should be the one sending the state");
//TransferableState thisState = getLog().getTransferableState(msg.getEid(), sendState);
ApplicationState thisState = dt.getRecoverer().getState(msg.getEid(), sendState);
lockState.unlock();
if (thisState == null) {
Logger.println("(TOMLayer.SMRequestDeliver) I don't have the state requested :-(");
System.out.println("(TOMLayer.SMRequestDeliver) I don't have the state requested :-(");
thisState = new TransferableState();
thisState = dt.getRecoverer().getState(-1, sendState);
}
int[] targets = { msg.getSender() };
......@@ -446,7 +408,7 @@ public class StateManager {
//if (reconfManager.getStaticConf().getProcessId() != 0 || !sendState)
tomLayer.getCommunication().send(targets, smsg);
Logger.println("(TOMLayer.SMRequestDeliver) I sent the state for checkpoint " + thisState.getLastCheckpointEid() + " with batches until EID " + thisState.getLastEid());
System.out.println("(TOMLayer.SMRequestDeliver) I sent the state until EID " + thisState.getLastEid());
}
}
......@@ -459,8 +421,8 @@ public class StateManager {
if (SVManager.getStaticConf().isStateTransferEnabled()) {
//******* EDUARDO END **************//
Logger.println("(TOMLayer.SMReplyDeliver) The state transfer protocol is enabled");
Logger.println("(TOMLayer.SMReplyDeliver) I received a state reply for EID " + msg.getEid() + " from replica " + msg.getSender());
System.out.println("(TOMLayer.SMReplyDeliver) The state transfer protocol is enabled");
System.out.println("(TOMLayer.SMReplyDeliver) I received a state reply for EID " + msg.getEid() + " from replica " + msg.getSender());
if (getWaiting() != -1 && msg.getEid() == getWaiting()) {
......@@ -479,11 +441,11 @@ public class StateManager {
}
}
Logger.println("(TOMLayer.SMReplyDeliver) The reply is for the EID that I want!");
System.out.println("(TOMLayer.SMReplyDeliver) The reply is for the EID that I want!");
if (msg.getSender() == getReplica() && msg.getState().getState() != null) {
Logger.println("(TOMLayer.SMReplyDeliver) I received the state, from the replica that I was expecting");
setReplicaState(msg.getState().getState());
if (msg.getSender() == getReplica() && msg.getState().getSerializedState() != null) {
System.out.println("(TOMLayer.SMReplyDeliver) I received the state, from the replica that I was expecting");
setReplicaState(msg.getState());
if (stateTimer != null) stateTimer.cancel();
}
......@@ -491,14 +453,14 @@ public class StateManager {
if (moreThanF_Replies()) {
Logger.println("(TOMLayer.SMReplyDeliver) I have at least " + SVManager.getCurrentViewF() + " replies!");
System.out.println("(TOMLayer.SMReplyDeliver) I have at least " + SVManager.getCurrentViewF() + " replies!");
TransferableState recvState = getValidHash();
ApplicationState recvState = getValidHash();
int haveState = 0;
if (getReplicaState() != null) {
byte[] hash = null;
hash = tomLayer.computeHash(getReplicaState());
hash = tomLayer.computeHash(getReplicaState().getSerializedState());
if (recvState != null) {
if (Arrays.equals(hash, recvState.getStateHash())) haveState = 1;
else if (getNumValidHashes() > SVManager.getCurrentViewF()) haveState = -1;
......@@ -509,20 +471,21 @@ public class StateManager {
if (recvState != null && haveState == 1 && currentRegency > -1 &&
currentLeader > -1 && currentView != null) {
Logger.println("(TOMLayer.SMReplyDeliver) The state of those replies is good!");
System.out.println("(TOMLayer.SMReplyDeliver) The state of those replies is good!");
lcManager.setLastReg(currentRegency);
lcManager.setNextReg(currentRegency);
tomLayer.lm.setNewReg(currentRegency);
tomLayer.lm.setNewLeader(currentLeader);
recvState.setState(getReplicaState());
recvState.setSerializedState(getReplicaState().getSerializedState());
lockState.lock();
// ISTO E PRECISO METER NA APLICACAO!!!!!!
/*lockState.lock();
getLog().update(recvState);
lockState.unlock();
lockState.unlock();*/
dt.deliverLock();
......@@ -530,7 +493,7 @@ public class StateManager {
setWaiting(-1);
dt.update(recvState);
dt.update(msg.getEid(), recvState);
//Deal with stopped messages that may come from synchronization phase
if (execManager.stopped()) {
......@@ -571,7 +534,7 @@ public class StateManager {
} else if (recvState == null && (SVManager.getCurrentViewN() / 2) < getReplies()) {
//******* EDUARDO END **************//
Logger.println("(TOMLayer.SMReplyDeliver) I have more than " +
System.out.println("(TOMLayer.SMReplyDeliver) I have more than " +
(SVManager.getCurrentViewN() / 2) + " messages that are no good!");
setWaiting(-1);
......@@ -582,7 +545,7 @@ public class StateManager {
if (stateTimer != null) stateTimer.cancel();
} else if (haveState == -1) {
Logger.println("(TOMLayer.SMReplyDeliver) The replica from which I expected the state, sent one which doesn't match the hash of the others, or it never sent it at all");
System.out.println("(TOMLayer.SMReplyDeliver) The replica from which I expected the state, sent one which doesn't match the hash of the others, or it never sent it at all");
//setWaiting(-1);
changeReplica();
......@@ -685,9 +648,9 @@ public class StateManager {
private class SenderState {
private int sender;
private TransferableState state;
private ApplicationState state;
SenderState(int sender, TransferableState state) {
SenderState(int sender, ApplicationState state) {
this.sender = sender;
this.state = state;
}
......@@ -705,7 +668,10 @@ public class StateManager {
public int hashCode() {
int hash = 1;
hash = hash * 31 + this.sender;
hash = hash * 31 + this.state.hashCode();
if (this.state != null) {
hash = hash * 31 + this.state.hashCode();
}
else hash = hash * 31 + 0;
return hash;
}
}
......
......@@ -214,7 +214,7 @@ public class ServiceReplica implements TOMReceiver {
cs.send(new int[]{tomMsg.getSender()}, tomMsg.reply);
}
public void receiveMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests) {
public void receiveMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests, byte[] decision) {
TOMMessage firstRequest = requests[0];
if(executor instanceof BatchExecutable) {
......@@ -280,6 +280,7 @@ public class ServiceReplica implements TOMReceiver {
int line = 0;
for(TOMMessage m : toBatch){
batch[line] = m.getContent();
line++;
}
MessageContext[] msgContexts = new MessageContext[msgCtxts.size()];
......@@ -373,8 +374,8 @@ public class ServiceReplica implements TOMReceiver {
/** THIS IS JOAO'S CODE, TO HANDLE CHECKPOINTS */
@Override
public byte[] getState() { //TODO: Here is race condition!
/*@Override
public byte[] getState() {
requestsLock.lock();
byte[] state = recoverer.getState();
requestsLock.unlock();
......@@ -387,7 +388,7 @@ public class ServiceReplica implements TOMReceiver {
requestsLock.lock();
recoverer.setState(state);
requestsLock.unlock();
}
}*/
/**
* This method initializes the object
......@@ -423,7 +424,7 @@ public class ServiceReplica implements TOMReceiver {
acceptor.setManager(manager);
proposer.setManager(manager);
tomLayer = new TOMLayer(manager, this, lm, acceptor, cs, SVManager);
tomLayer = new TOMLayer(manager, this, recoverer, lm, acceptor, cs, SVManager);
manager.setTOMLayer(tomLayer);
......
......@@ -44,7 +44,7 @@ public interface TOMReceiver {
* @param regency
* @param requests The batch with TOMMessage objects.
*/
public void receiveMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests);
public void receiveMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests, byte[] decision);
/**
......@@ -53,13 +53,13 @@ public interface TOMReceiver {
* state.
* @return An rray of bytes that can be diserialized into the application state
*/
public byte[] getState();
//public byte[] getState();
/**
* This method is invoked by the TOM Layer in order to ser a state upon the aplication. This is done when
* a replica is delayed compared to the rest of the group, and when it recovers after a failure.
*/
public void setState(byte[] state);
//public void setState(byte[] state);
}
......@@ -25,10 +25,11 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import navigators.smart.paxosatwar.Consensus;
import navigators.smart.reconfiguration.ServerViewManager;
import navigators.smart.statemanagment.TransferableState;
import navigators.smart.statemanagment.ApplicationState;
import navigators.smart.tom.TOMReceiver;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.core.messages.TOMMessageType;
import navigators.smart.tom.server.Recoverable;
import navigators.smart.tom.util.BatchReader;
import navigators.smart.tom.util.Logger;
......@@ -41,6 +42,7 @@ public final class DeliveryThread extends Thread {
private LinkedBlockingQueue<Consensus> decided = new LinkedBlockingQueue<Consensus>(); // decided consensus
private TOMLayer tomLayer; // TOM layer
private TOMReceiver receiver; // Object that receives requests from clients
private Recoverable recoverer; // Object that uses state transfer
private ServerViewManager manager;
/**
......@@ -49,16 +51,26 @@ public final class DeliveryThread extends Thread {
* @param receiver Object that receives requests from clients
* @param conf TOM configuration
*/
public DeliveryThread(TOMLayer tomLayer, TOMReceiver receiver, ServerViewManager manager) {
public DeliveryThread(TOMLayer tomLayer, TOMReceiver receiver, Recoverable recoverer, ServerViewManager manager) {
super("Delivery Thread");
this.tomLayer = tomLayer;
this.receiver = receiver;
this.recoverer = recoverer;
//******* EDUARDO BEGIN **************//
this.manager = manager;
//******* EDUARDO END **************//
}
public TOMReceiver getReceiver() {
return receiver;
}
public Recoverable getRecoverer() {
return recoverer;
}
/**
* Invoked by the TOM layer, to deliver a decide consensus
* @param cons Consensus established as being decided
......@@ -111,55 +123,9 @@ public final class DeliveryThread extends Thread {
canDeliver.signalAll();
}
public void update(TransferableState state) {
int lastCheckpointEid = state.getLastCheckpointEid();
//int lastEid = state.getLastEid();
int lastEid = lastCheckpointEid + (state.getMessageBatches() != null ? state.getMessageBatches().length : 0);
Logger.println("(DeliveryThread.update) I'm going to update myself from EID "
+ lastCheckpointEid + " to EID " + lastEid);
receiver.setState(state.getState());
tomLayer.lm.addLeaderInfo(lastCheckpointEid, state.getLastCheckpointRound(),
state.getLastCheckpointLeader());
for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
try {
byte[] batch = state.getMessageBatch(eid).batch; // take a batch
tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round,
state.getMessageBatch(eid).leader);
Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
TOMMessage[] requests = new BatchReader(batch,
manager.getStaticConf().getUseSignatures() == 1).deserialiseRequests(manager);
tomLayer.clientsManager.requestsOrdered(requests);
deliverMessages(eid, tomLayer.getLCManager().getLastReg(), false, requests);
//******* EDUARDO BEGIN **************//
if (manager.hasUpdates()) {
processReconfigMessages(lastCheckpointEid, state.getLastCheckpointRound());
}
//******* EDUARDO END **************//
} catch (Exception e) {
e.printStackTrace(System.err);
if (e instanceof ArrayIndexOutOfBoundsException) {
System.out.println("Eid do ultimo checkpoint: " + state.getLastCheckpointEid());
System.out.println("Eid do ultimo consenso: " + state.getLastEid());
System.out.println("numero de mensagens supostamente no batch: " + (state.getLastEid() - state.getLastCheckpointEid() + 1));
System.out.println("numero de mensagens realmente no batch: " + state.getMessageBatches().length);
}
}
}
public void update(int eid, ApplicationState state) {
int lastEid = recoverer.setState(eid, state);
//set this consensus as the last executed
tomLayer.setLastExec(lastEid);
......@@ -179,7 +145,7 @@ public final class DeliveryThread extends Thread {
decided.clear();
Logger.println("(DeliveryThread.update) All finished from " + lastCheckpointEid + " to " + lastEid);
Logger.println("(DeliveryThread.update) All finished from up to " + lastEid);
}
/**
......@@ -215,7 +181,7 @@ public final class DeliveryThread extends Thread {
//clean the ordered messages from the pending buffer
tomLayer.clientsManager.requestsOrdered(requests);
deliverMessages(cons.getId(), tomLayer.getLCManager().getLastReg(), true, requests);
deliverMessages(cons.getId(), tomLayer.getLCManager().getLastReg(), true, requests, cons.getDecision());
//******* EDUARDO BEGIN **************//
if (manager.hasUpdates()) {
......@@ -228,7 +194,7 @@ public final class DeliveryThread extends Thread {
//******* EDUARDO END **************//
/** THIS IS JOAO'S CODE, TO HANDLE CHECKPOINTS */
logDecision(cons);
//logDecision(cons);
/********************************************************/
//define the last stable consensus... the stable consensus can
//be removed from the leaderManager and the executionManager
......@@ -277,8 +243,8 @@ public final class DeliveryThread extends Thread {
receiver.receiveReadonlyMessage(request, msgCtx);
}
private void deliverMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests) {
receiver.receiveMessages(consId, regency, fromConsensus, requests);
private void deliverMessages(int consId, int regency, boolean fromConsensus, TOMMessage[] requests, byte[] decision) {
receiver.receiveMessages(consId, regency, fromConsensus, requests, decision);
}
private void processReconfigMessages(int consId, int decisionRoundNumber) {
......@@ -299,13 +265,11 @@ public final class DeliveryThread extends Thread {
if (manager.getStaticConf().getCheckpointPeriod() > 0) {
if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpointPeriod()) == 0)) {
Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
byte[] state = receiver.getState();
tomLayer.getStateManager().saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
//TODO: possivelmente fazer mais alguma coisa
//byte[] state = receiver.getState();
//tomLayer.getStateManager().saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
} else {
Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
tomLayer.getStateManager().saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
//TODO: possivelmente fazer mais alguma coisa
//tomLayer.getStateManager().saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
}
}
}
......
......@@ -61,6 +61,7 @@ import navigators.smart.tom.leaderchange.LCMessage;
import navigators.smart.tom.leaderchange.CollectData;
import navigators.smart.tom.leaderchange.LCManager;
import navigators.smart.tom.leaderchange.LastEidData;
import navigators.smart.tom.server.Recoverable;
/**
......@@ -124,6 +125,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
*/
public TOMLayer(ExecutionManager manager,
TOMReceiver receiver,
Recoverable recoverer,
LeaderModule lm,
Acceptor a,
ServerCommunicationSystem cs,
......@@ -163,7 +165,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
this.lcManager = new LCManager(this,recManager, md);
/*************************************************************/
this.dt = new DeliveryThread(this, receiver, this.reconfManager); // Create delivery thread
this.dt = new DeliveryThread(this, receiver, recoverer, this.reconfManager); // Create delivery thread
this.dt.start();
/** THIS IS JOAO'S CODE, TO HANDLE CHECKPOINTS AND STATE TRANSFER */
......@@ -1028,14 +1030,14 @@ public final class TOMLayer extends Thread implements RequestReceiver {
if (getLastExec() + 1 < lastHighestEid.getEid()) {
//TODO: Case in which it is necessary to apply state transfer
System.out.println("NEEDING TO USE STATE TRANSFER!!");
System.out.println("NEEDING TO USE STATE TRANSFER!! (" + lastHighestEid.getEid() + ")");
} else if (getLastExec() + 1 == lastHighestEid.getEid()) {
// Is this replica still executing the last decided consensus?
//TODO: it is necessary to verify the proof
System.out.println("I'm still at the eid before the most recent onedl!");
System.out.println("I'm still at the eid before the most recent one!!! (" + lastHighestEid.getEid() + ")");
exec = execManager.getExecution(lastHighestEid.getEid());
r = exec.getLastRound();
......
......@@ -24,6 +24,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import navigators.smart.statemanagment.ApplicationState;
import navigators.smart.tom.MessageContext;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.server.SingleExecutable;
......@@ -95,7 +96,6 @@ public final class CounterServer implements SingleExecutable, Recoverable {
}
/** THIS IS JOAO'S CODE, TO HANDLE CHECKPOINTS */
@Override
public byte[] getState() {
//System.out.println("reading counter: "+this.counter);
......@@ -110,7 +110,6 @@ public final class CounterServer implements SingleExecutable, Recoverable {
//throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void setState(byte[] state) {
int value = 0;
......@@ -125,5 +124,15 @@ public final class CounterServer implements SingleExecutable, Recoverable {
// System.out.println("Value of deserialized counter "+this.counter);
}
@Override
public ApplicationState getState(int eid, boolean sendState) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int setState(int eid, ApplicationState state) {
throw new UnsupportedOperationException("Not supported yet.");
}
/********************************************************/
}
......@@ -18,6 +18,7 @@
package navigators.smart.tom.demo.microbenchmarks;
import navigators.smart.statemanagment.ApplicationState;
import navigators.smart.tom.MessageContext;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.server.SingleExecutable;
......@@ -124,4 +125,14 @@ public class LatencyServer implements SingleExecutable, Recoverable {
public void setState(byte[] state) {
}
@Override
public ApplicationState getState(int eid, boolean sendState) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int setState(int eid, ApplicationState state) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
......@@ -18,6 +18,7 @@
package navigators.smart.tom.demo.microbenchmarks;
import navigators.smart.statemanagment.ApplicationState;
import navigators.smart.tom.MessageContext;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.server.Executable;
......@@ -147,4 +148,14 @@ public final class ThroughputLatencyServer implements SingleExecutable, Recovera
public void setState(byte[] state) {
}
@Override
public ApplicationState getState(int eid, boolean sendState) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int setState(int eid, ApplicationState state) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
......@@ -28,6 +28,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import navigators.smart.statemanagment.ApplicationState;
import navigators.smart.tom.ServiceReplica;
import java.util.Scanner;
import navigators.smart.tom.MessageContext;
......@@ -172,4 +173,14 @@ public final class RandomServer implements Executable, Recoverable {
this.value = value;
}
@Override
public ApplicationState getState(int eid, boolean sendState) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int setState(int eid, ApplicationState state) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
......@@ -21,7 +21,7 @@
* and open the template in the editor.
*/
package navigators.smart.statemanagment;
package navigators.smart.tom.server;
import java.io.Serializable;
import java.util.Arrays;
......@@ -30,29 +30,48 @@ import java.util.Arrays;
*
* @author Joao Sousa
*/
public class BatchInfo implements Serializable {
public class CommandsInfo implements Serializable {
public final byte[] batch;
public final byte[][] commands;
public final int round;
public final int leader;
public BatchInfo () {
this.batch = null;
public CommandsInfo () {
this.commands = null;
this.round = -1;
this.leader = -1;
}
public BatchInfo(byte[] batch, int round, int leader) {
this.batch = batch;
public CommandsInfo(byte[][] commands, int round, int leader) {
this.commands = commands;
this.round = round;
this.leader = leader;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof BatchInfo) {
BatchInfo bi = (BatchInfo) obj;
return Arrays.equals(this.batch, bi.batch) && this.round == bi.round && this.leader == bi.leader;
if (obj instanceof CommandsInfo) {
CommandsInfo ci = (CommandsInfo) obj;
if ((this.commands != null && ci.commands == null) ||
(this.commands == null && ci.commands != null)) return false;
if (this.commands != null && ci.commands != null) {
if (this.commands.length != ci.commands.length) return false;
for (int i = 0; i < this.commands.length; i++) {
if (this.commands[i] == null && ci.commands[i] != null) return false;
if (this.commands[i] != null && ci.commands[i] == null) return false;
if (!(this.commands[i] == null && ci.commands[i] == null) &&
(!Arrays.equals(this.commands, ci.commands))) return false;
}
}
return this.round == ci.round && this.leader == ci.leader;
}
return false;
}
......@@ -61,10 +80,16 @@ public class BatchInfo implements Serializable {
public int hashCode() {
int hash = 1;
if (this.batch != null) {
for (int j = 0; j < this.batch.length; j++)
hash = hash * 31 + (int) this.batch[j];
if (this.commands != null) {
for (int i = 0; i < this.commands.length; i++) {
if (this.commands[i] != null) {
for (int j = 0; j < this.commands[i].length; j++)
hash = hash * 31 + (int) this.commands[i][j];
} else {
hash = hash * 31 + 0;
}
}
} else {
hash = hash * 31 + 0;
}
......
......@@ -16,10 +16,11 @@
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.statemanagment;
package navigators.smart.tom.server;
import java.io.Serializable;
import java.util.Arrays;
import navigators.smart.statemanagment.ApplicationState;
/**
* This classe represents a state tranfered from a replica to another. The state associated with the last
......@@ -28,16 +29,17 @@ import java.util.Arrays;
*
* @author Jo�o Sousa
*/
public class TransferableState implements Serializable {
public class DefaultApplicationState implements ApplicationState {
private BatchInfo[] messageBatches; // batches received since the last checkpoint.
protected byte[] state; // State associated with the last checkpoint
protected byte[] stateHash; // Hash of the state associated with the last checkpoint
protected int lastEid = -1; // Execution ID for the last messages batch delivered to the application
protected boolean hasState; // indicates if the replica really had the requested state
private CommandsInfo[] messageBatches; // batches received since the last checkpoint.
private int lastCheckpointEid; // Execution ID for the last checkpoint
private int lastCheckpointRound; // Round for the last checkpoint
private int lastCheckpointLeader; // Leader for the last checkpoint
private byte[] state; // State associated with the last checkpoint
private byte[] stateHash; // Hash of the state associated with the last checkpoint
private int lastEid = -1; // Execution ID for the last messages batch delivered to the application
private boolean hasState; // indicates if the replica really had the requested state
/**
* Constructs a TansferableState
......@@ -46,8 +48,8 @@ public class TransferableState implements Serializable {
* @param state State associated with the last checkpoint
* @param stateHash Hash of the state associated with the last checkpoint
*/
public TransferableState(BatchInfo[] messageBatches, int lastCheckpointEid, int lastCheckpointRound, int lastCheckpointLeader, int lastEid, byte[] state, byte[] stateHash) {
public DefaultApplicationState(CommandsInfo[] messageBatches, int lastCheckpointEid, int lastCheckpointRound, int lastCheckpointLeader, int lastEid, byte[] state, byte[] stateHash) {
this.messageBatches = messageBatches; // batches received since the last checkpoint.
this.lastCheckpointEid = lastCheckpointEid; // Execution ID for the last checkpoint
this.lastCheckpointRound = lastCheckpointRound; // Round for the last checkpoint
......@@ -62,7 +64,9 @@ public class TransferableState implements Serializable {
* Constructs a TansferableState
* This constructor should be used when there isn't a valid state to construct the object with
*/
public TransferableState() {
public DefaultApplicationState() {
this.messageBatches = null; // batches received since the last checkpoint.
this.lastCheckpointEid = -1; // Execution ID for the last checkpoint
this.lastCheckpointRound = -1; // Round for the last checkpoint
......@@ -72,7 +76,16 @@ public class TransferableState implements Serializable {
this.stateHash = null;
this.hasState = false;
}
public void setSerializedState(byte[] state) {
this.state = state;
}
public byte[] getSerializedState() {
return state;
}
/**
* Indicates if the TransferableState object has a valid state
* @return true if it has a valid state, false otherwise
......@@ -81,11 +94,45 @@ public class TransferableState implements Serializable {
return hasState;
}
/**
* Retrieves the execution ID for the last messages batch delivered to the application
* @return Execution ID for the last messages batch delivered to the application
*/
public int getLastEid() {
return lastEid;
}
/**
* Retrieves the state associated with the last checkpoint
* @return State associated with the last checkpoint
*/
public byte[] getState() {
return state;
}
/**
* Retrieves the hash of the state associated with the last checkpoint
* @return Hash of the state associated with the last checkpoint
*/
public byte[] getStateHash() {
return stateHash;
}
/**
* Sets the state associated with the last checkpoint
* @param state State associated with the last checkpoint
*/
public void setState(byte[] state) {
this.state = state;
}
/**
* Retrieves all batches of messages
* @return Batch of messages
*/
public BatchInfo[] getMessageBatches() {
public CommandsInfo[] getMessageBatches() {
return messageBatches;
}
......@@ -94,7 +141,7 @@ public class TransferableState implements Serializable {
* @param eid Execution ID associated with the batch to be fetched
* @return The batch of messages associated with the batch correspondent execution ID
*/
public BatchInfo getMessageBatch(int eid) {
public CommandsInfo getMessageBatch(int eid) {
if (eid >= lastCheckpointEid && eid <= lastEid) {
return messageBatches[eid - lastCheckpointEid - 1];
}
......@@ -128,43 +175,10 @@ public class TransferableState implements Serializable {
return lastCheckpointLeader;
}
/**
* Retrieves the execution ID for the last messages batch delivered to the application
* @return Execution ID for the last messages batch delivered to the application
*/
public int getLastEid() {
return lastEid;
}
/**
* Retrieves the state associated with the last checkpoint
* @return State associated with the last checkpoint
*/
public byte[] getState() {
return state;
}
/**
* Retrieves the hash of the state associated with the last checkpoint
* @return Hash of the state associated with the last checkpoint
*/
public byte[] getStateHash() {
return stateHash;
}
/**
* Sets the state associated with the last checkpoint
* @param state State associated with the last checkpoint
*/
public void setState(byte[] state) {
this.state = state;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TransferableState) {
TransferableState tState = (TransferableState) obj;
if (obj instanceof DefaultApplicationState) {
DefaultApplicationState tState = (DefaultApplicationState) obj;
if ((this.messageBatches != null && tState.messageBatches == null) ||
(this.messageBatches == null && tState.messageBatches != null)) return false;
......@@ -219,4 +233,5 @@ public class TransferableState implements Serializable {
}
return hash;
}
}
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package navigators.smart.tom.server;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import navigators.smart.statemanagment.ApplicationState;
import navigators.smart.tom.MessageContext;
import navigators.smart.tom.util.Logger;
/**
*
* @author Joao Sousa
*/
public abstract class DefaultRecoverable implements Recoverable, BatchExecutable {
public static final int CHECKPOINT_PERIOD = 50;
private ReentrantLock lockState = new ReentrantLock();
private ReentrantLock hashLock = new ReentrantLock();
private MessageDigest md;
private StateLog log;
public DefaultRecoverable() {
log = new StateLog(CHECKPOINT_PERIOD);
try {
md = MessageDigest.getInstance("MD5"); // TODO: shouldn't it be SHA?
} catch (NoSuchAlgorithmException ex) {
java.util.logging.Logger.getLogger(DefaultRecoverable.class.getName()).log(Level.SEVERE, null, ex);
}
}
public byte[][] executeBatch(byte[][] commands, MessageContext[] msgCtxs) {
int eid = msgCtxs[0].getConsensusId();
if ((eid > 0) && ((eid % CHECKPOINT_PERIOD) == 0)) {
Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + eid);
saveState(getSnapshot(), eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
} else {
Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + eid);
saveCommands(commands, eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
}
return executeBatch2(commands, msgCtxs); }
public final byte[] computeHash(byte[] data) {
byte[] ret = null;
hashLock.lock();
ret = md.digest(data);
hashLock.unlock();
return ret;
}
private StateLog getLog() {
return log;
}
public void saveState(byte[] snapshot, int lastEid, int decisionRound, int leader) {
StateLog thisLog = getLog();
lockState.lock();
Logger.println("(TOMLayer.saveState) Saving state of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
thisLog.newCheckpoint(snapshot, computeHash(snapshot));
thisLog.setLastEid(-1);
thisLog.setLastCheckpointEid(lastEid);
thisLog.setLastCheckpointRound(decisionRound);
thisLog.setLastCheckpointLeader(leader);
lockState.unlock();
System.out.println("fiz checkpoint");
System.out.println("tamanho do snapshot: " + snapshot.length);
System.out.println("tamanho do log: " + thisLog.getMessageBatches().length);
Logger.println("(TOMLayer.saveState) Finished saving state of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
}
public void saveCommands(byte[][] commands, int lastEid, int decisionRound, int leader) {
StateLog thisLog = getLog();
lockState.lock();
Logger.println("(TOMLayer.saveBatch) Saving batch of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
thisLog.addMessageBatch(commands, decisionRound, leader);
thisLog.setLastEid(lastEid);
lockState.unlock();
System.out.println("guardei comandos");
System.out.println("tamanho do log: " + thisLog.getNumBatches());
Logger.println("(TOMLayer.saveBatch) Finished saving batch of EID " + lastEid + ", round " + decisionRound + " and leader " + leader);
}
@Override
public ApplicationState getState(int eid, boolean sendState) {
return (eid > -1 ? getLog().getTransferableState(eid, sendState) : new DefaultApplicationState());
}
@Override
public int setState(int recvEid, ApplicationState recvState) {
int lastEid = -1;
if (recvState instanceof DefaultApplicationState) {
DefaultApplicationState state = (DefaultApplicationState) recvState;
getLog().update(state);
int lastCheckpointEid = state.getLastCheckpointEid();
//int lastEid = state.getLastEid();
lastEid = lastCheckpointEid + (state.getMessageBatches() != null ? state.getMessageBatches().length : 0);
navigators.smart.tom.util.Logger.println("(DeliveryThread.update) I'm going to update myself from EID "
+ lastCheckpointEid + " to EID " + lastEid);
installSnapshot(state.getState());
// INUTIL??????
//tomLayer.lm.addLeaderInfo(lastCheckpointEid, state.getLastCheckpointRound(),
// state.getLastCheckpointLeader());
for (int eid = lastCheckpointEid + 1; eid <= lastEid; eid++) {
try {
byte[][] commands = state.getMessageBatch(eid).commands; // take a batch
// INUTIL??????
//tomLayer.lm.addLeaderInfo(eid, state.getMessageBatch(eid).round,
// state.getMessageBatch(eid).leader);
navigators.smart.tom.util.Logger.println("(DeliveryThread.update) interpreting and verifying batched requests.");
//TROCAR POR EXECUTE E ARRAY DE MENSAGENS!!!!!!
//TOMMessage[] requests = new BatchReader(batch,
// manager.getStaticConf().getUseSignatures() == 1).deserialiseRequests(manager);
executeBatch2(commands, null);
// ISTO E UM PROB A RESOLVER!!!!!!!!!!!!
//tomLayer.clientsManager.requestsOrdered(requests);
// INUTIL??????
//deliverMessages(eid, tomLayer.getLCManager().getLastReg(), false, requests, batch);
// IST E UM PROB A RESOLVER!!!!
//******* EDUARDO BEGIN **************//
/*if (manager.hasUpdates()) {
processReconfigMessages(lastCheckpointEid, state.getLastCheckpointRound());
}*/
//******* EDUARDO END **************//
} catch (Exception e) {
e.printStackTrace(System.err);
if (e instanceof ArrayIndexOutOfBoundsException) {
System.out.println("Eid do ultimo checkpoint: " + state.getLastCheckpointEid());
System.out.println("Eid do ultimo consenso: " + state.getLastEid());
System.out.println("numero de mensagens supostamente no batch: " + (state.getLastEid() - state.getLastCheckpointEid() + 1));
System.out.println("numero de mensagens realmente no batch: " + state.getMessageBatches().length);
}
}
}
}
return lastEid;
}
public abstract void installSnapshot(byte[] state);
public abstract byte[] getSnapshot();
public abstract byte[][] executeBatch2(byte[][] commands, MessageContext[] msgCtxs);
}
package navigators.smart.tom.server;
import navigators.smart.statemanagment.ApplicationState;
/**
*
* @author mhsantos
......@@ -7,7 +9,22 @@ package navigators.smart.tom.server;
*/
public interface Recoverable {
public byte[] getState();
public void setState(byte[] state);
/**
*
* This method should return a representation of the application state
* @param eid Execution up to which the application should return an Application state
* @param sendState true if the replica should send a complete
* representation of the state instead of only the hash. False otherwise
* @return A representation of the application state
*/
public ApplicationState getState(int eid, boolean sendState);
/**
* Sets the state to the representation obtained in the state transfer protocol
* @param eid Execution up to which the state is complete
* @param state State obtained in the state transfer protocol
* @return
*/
public int setState(int eid, ApplicationState state);
}
......@@ -16,7 +16,7 @@
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.statemanagment;
package navigators.smart.tom.server;
/**
* This classes serves as a log for the state associated with the last checkpoint, and the message
......@@ -28,7 +28,7 @@ package navigators.smart.statemanagment;
*/
public class StateLog {
private BatchInfo[] messageBatches; // batches received since the last checkpoint.
private CommandsInfo[] messageBatches; // batches received since the last checkpoint.
private int lastCheckpointEid; // Execution ID for the last checkpoint
private int lastCheckpointRound; // Decision round for the last checkpoint
private int lastCheckpointLeader; // Leader for the last checkpoint
......@@ -43,7 +43,7 @@ public class StateLog {
*/
public StateLog(int k) {
this.messageBatches = new BatchInfo[k - 1];
this.messageBatches = new CommandsInfo[k - 1];
this.lastCheckpointEid = -1;
this.lastCheckpointRound = -1;
this.lastCheckpointLeader = -1;
......@@ -162,11 +162,11 @@ public class StateLog {
* the 'k' batches received after the last checkpoint are supposed to be kept
* @param batch The batch of messages to be kept.
*/
public void addMessageBatch(byte[] batch, int round, int leader) {
public void addMessageBatch(byte[][] commands, int round, int leader) {
if (position < messageBatches.length) {
messageBatches[position] = new BatchInfo(batch, round, leader);
messageBatches[position] = new CommandsInfo(commands, round, leader);
position++;
}
}
......@@ -176,7 +176,7 @@ public class StateLog {
* @param eid Execution ID associated with the batch to be fetched
* @return The batch of messages associated with the batch correspondent execution ID
*/
public BatchInfo getMessageBatch(int eid) {
public CommandsInfo getMessageBatch(int eid) {
if (eid > lastCheckpointEid && eid <= lastEid) {
return messageBatches[eid - lastCheckpointEid - 1];
}
......@@ -187,7 +187,7 @@ public class StateLog {
* Retrieves all the stored batches kept since the last checkpoint
* @return All the stored batches kept since the last checkpoint
*/
public BatchInfo[] getMessageBatches() {
public CommandsInfo[] getMessageBatches() {
return messageBatches;
}
......@@ -203,17 +203,17 @@ public class StateLog {
* @param eid Execution ID correspondent to desired state
* @return TransferableState Object containing this log information
*/
public TransferableState getTransferableState(int eid, boolean setState) {
public DefaultApplicationState getTransferableState(int eid, boolean setState) {
if (lastCheckpointEid > -1 && eid >= lastCheckpointEid) {
BatchInfo[] batches = null;
CommandsInfo[] batches = null;
if (eid <= lastEid) {
int size = eid - lastCheckpointEid ;
if (size > 0) {
batches = new BatchInfo[size];
batches = new CommandsInfo[size];
for (int i = 0; i < size; i++)
batches[i] = messageBatches[i];
......@@ -222,7 +222,7 @@ public class StateLog {
batches = messageBatches;
}
return new TransferableState(batches, lastCheckpointEid, lastCheckpointRound, lastCheckpointLeader, eid, (setState ? state : null), stateHash);
return new DefaultApplicationState(batches, lastCheckpointEid, lastCheckpointRound, lastCheckpointLeader, eid, (setState ? state : null), stateHash);
}
else return null;
......@@ -232,7 +232,7 @@ public class StateLog {
* Updates this log, according to the information contained in the TransferableState object
* @param transState TransferableState object containing the information which is used to updated this log
*/
public void update(TransferableState transState) {
public void update(DefaultApplicationState transState) {
position = 0;
if (transState.getMessageBatches() != null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册