提交 880a3746 编写于 作者: B bessani@gmail.com

Cleanup on the TOMConfiguration. Removed several configuration parameters that...

Cleanup on the TOMConfiguration. Removed several configuration parameters that do not make sense anymore. Besides that merged some packages and cleaned a bit more of code here and there.
上级 46889a58
......@@ -24,7 +24,7 @@ import navigators.smart.paxosatwar.roles.Acceptor;
import navigators.smart.statemanagment.SMMessage;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.core.timer.messages.ForwardedMessage;
import navigators.smart.tom.core.timer.ForwardedMessage;
import navigators.smart.tom.util.TOMUtil;
import navigators.smart.tom.leaderchange.LCMessage;
......
......@@ -107,7 +107,7 @@ public class ServerCommunicationSystem extends Thread {
long count = 0;
while (true) {
try {
if (count % 1000 == 0) {
if (count % 1000 == 0 && count > 0) {
Logger.println("(ServerCommunicationSystem.run) After " + count + " messages, inQueue size=" + inQueue.size());
}
......
......@@ -15,24 +15,18 @@
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.communication.client;
import navigators.smart.communication.client.netty.NettyClientServerCommunicationSystemClientSide;
import navigators.smart.reconfiguration.ViewManager;
/**
*
* @author Paulo
*/
public class CommunicationSystemClientSideFactory {
public static CommunicationSystemClientSide getCommunicationSystemClientSide(ViewManager manager){
if (manager.getStaticConf().clientServerCommSystem() == 1){
return new NettyClientServerCommunicationSystemClientSide(manager);
}
System.out.println("Error: no client-server communication system is defined");
return null;
public static CommunicationSystemClientSide getCommunicationSystemClientSide(ViewManager manager) {
return new NettyClientServerCommunicationSystemClientSide(manager);
}
}
......@@ -15,26 +15,18 @@
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.communication.client;
import navigators.smart.communication.client.netty.NettyClientServerCommunicationSystemServerSide;
import navigators.smart.reconfiguration.ReconfigurationManager;
/**
*
* @author Paulo
*/
public class CommunicationSystemServerSideFactory {
public static CommunicationSystemServerSide getCommunicationSystemServerSide(ReconfigurationManager manager){
if (manager.getStaticConf().clientServerCommSystem() == 1){
return new NettyClientServerCommunicationSystemServerSide(manager);
}
System.out.println("Error: no client-server communication system is defined");
return null;
public static CommunicationSystemServerSide getCommunicationSystemServerSide(ReconfigurationManager manager) {
return new NettyClientServerCommunicationSystemServerSide(manager);
}
}
......@@ -66,6 +66,7 @@ public class NettyClientPipelineFactory implements ChannelPipelineFactory {
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = pipeline();
p.addLast("decoder", new NettyTOMMessageDecoder(isClient, sessionTable, authKey, macLength,manager,rl,signatureLength,manager.getStaticConf().getUseMACs()==1?true:false));
......
......@@ -20,15 +20,15 @@ package navigators.smart.communication.client.netty;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.spec.InvalidKeySpecException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.concurrent.Executors;
......@@ -143,10 +143,13 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
+ "mas temos que fazer os servidores armazenarem as view em um lugar default.");
} catch (InvalidKeyException ex) {
ex.printStackTrace(System.err);
}
}
} catch (InvalidKeySpecException ex) {
ex.printStackTrace(System.err);
} catch (NoSuchAlgorithmException ex) {
ex.printStackTrace(System.err);
}
}
......@@ -231,10 +234,11 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
//if (!(e.getCause() instanceof ClosedChannelException) && !(e.getCause() instanceof ConnectException)) {
System.out.println("Excepção no CS (client side)!");
e.getCause().printStackTrace();
//}
if (!(e.getCause() instanceof ClosedChannelException) && !(e.getCause() instanceof ConnectException)) {
System.out.println("Connection with server closed.");
} else {
e.getCause().printStackTrace(System.err);
}
}
@Override
......
......@@ -20,7 +20,9 @@ package navigators.smart.communication.client.netty;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.security.NoSuchAlgorithmException;
import java.security.spec.InvalidKeySpecException;
import java.util.ArrayList;
......@@ -33,7 +35,6 @@ import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
......@@ -49,7 +50,6 @@ import navigators.smart.tom.util.TOMUtil;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
......@@ -123,47 +123,27 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
//if (!(e.getCause() instanceof ClosedChannelException))
System.out.println("Excepção no CS (server side)!");
e.getCause().printStackTrace();
public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e) {
if (!(e.getCause() instanceof ClosedChannelException) && !(e.getCause() instanceof ConnectException)) {
System.out.println("Connection with client closed...");
} else {
e.getCause().printStackTrace(System.err);
}
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
TOMMessage sm = (TOMMessage) e.getMessage();
//******* EDUARDO BEGIN **************//
if (manager.getStaticConf().getCommBuffering() > 0) {
lock.lock();
requestsReceived.add(sm);
if (requestsReceived.size()>= manager.getStaticConf().getCommBuffering()){
//******* EDUARDO END **************//
for (int i=0; i<requestsReceived.size(); i++) {
//delivers message to TOMLayer
requestReceiver.requestReceived(requestsReceived.get(i));
}
requestsReceived = null;
requestsReceived = new ArrayList<TOMMessage>();
}
lock.unlock();
} else {
//delivers message to TOMLayer
requestReceiver.requestReceived(sm);
}
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
//delivers message to TOMLayer
requestReceiver.requestReceived((TOMMessage) e.getMessage());
}
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) {
navigators.smart.tom.util.Logger.println("Session Created, active clients=" + sessionTable.size());
public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e) {
Logger.println("Session Created, active clients=" + sessionTable.size());
}
@Override
public void channelClosed(
ChannelHandlerContext ctx, ChannelStateEvent e) {
public void channelClosed( ChannelHandlerContext ctx, ChannelStateEvent e) {
rl.writeLock().lock();
//removes session from sessionTable
Set s = sessionTable.entrySet();
......
......@@ -57,10 +57,9 @@ public class Execution {
* @param consensus Consensus instance to which this execution works for
* @param initialTimeout Initial timeout for rounds
*/
protected Execution(ExecutionManager manager, Consensus consensus, long initialTimeout) {
protected Execution(ExecutionManager manager, Consensus consensus) {
this.manager = manager;
this.consensus = consensus;
this.initialTimeout = initialTimeout;
}
/**
......@@ -107,7 +106,7 @@ public class Execution {
Round round = rounds.get(number);
if(round == null && create){
round = new Round(recManager, this, number, initialTimeout);
round = new Round(recManager, this, number);
rounds.put(number, round);
}
......@@ -179,7 +178,7 @@ public class Execution {
}
max++;
Round round = new Round(recManager, this, max, initialTimeout);
Round round = new Round(recManager, this, max);
rounds.put(max, round);
roundsLock.unlock();
......@@ -224,7 +223,7 @@ public class Execution {
*/
public Round getLastRound() {
roundsLock.lock();
if (rounds.size() == 0) {
if (rounds.isEmpty()) {
roundsLock.unlock();
return null;
}
......@@ -234,7 +233,7 @@ public class Execution {
}
/**
* Informs wether or not the execution is decided
* Informs whether or not the execution is decided
*
* @return True if it is decided, false otherwise
*/
......@@ -246,7 +245,7 @@ public class Execution {
* Called by the Acceptor, to set the decided value
*
* @param value The decided value
* @param round The round at which a desision was made
* @param round The round at which a decision was made
*/
public void decided(Round round, byte[] value) {
if (!decided) {
......
......@@ -64,7 +64,6 @@ public final class ExecutionManager {
private Round stoppedRound = null; // round at which the current execution was stoppped
private ReentrantLock stoppedMsgsLock = new ReentrantLock(); //lock for stopped messages
private TOMLayer tomLayer; // TOM layer associated with this execution manager
private long initialTimeout; // initial timeout for rounds
private int paxosHighMark; // Paxos high mark for consensus instances
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
private int revivalHighMark; // Paxos high mark for consensus instances when this replica EID equals 0
......@@ -81,14 +80,12 @@ public final class ExecutionManager {
* @param initialTimeout initial timeout for rounds
*/
public ExecutionManager(ReconfigurationManager manager, Acceptor acceptor,
Proposer proposer, int me, long initialTimeout) {
Proposer proposer, int me) {
//******* EDUARDO BEGIN **************//
this.reconfManager = manager;
this.acceptor = acceptor;
this.proposer = proposer;
//this.acceptors = manager.getCurrentViewProcesses();
//this.me = me;
this.initialTimeout = initialTimeout;
this.paxosHighMark = reconfManager.getStaticConf().getPaxosHighMark();
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
......@@ -299,8 +296,8 @@ public final class ExecutionManager {
/**
* Returns the specified consensus' execution
*
* @param eid ID of the consensus' execution to be returned
* @return The consensus' execution specified
* @param eid ID of the consensus execution to be returned
* @return The consensus execution specified
*/
public Execution getExecution(int eid) {
executionsLock.lock();
......@@ -312,7 +309,7 @@ public final class ExecutionManager {
//let's create one...
Consensus cons = new Consensus(eid);
execution = new Execution(this, cons, initialTimeout);
execution = new Execution(this, cons);
//...and add it to the executions table
executions.put(eid, execution);
......@@ -394,6 +391,7 @@ public final class ExecutionManager {
outOfContextLock.unlock();
}
@Override
public String toString() {
return stoppedMsgs.toString();
}
......
......@@ -32,18 +32,17 @@ import navigators.smart.tom.core.messages.TOMMessage;
public class Round implements Serializable {
private transient Execution execution; // Execution where the round belongs to
//private transient TimeoutTask timeoutTask; // Timeout ssociated with this round
private int number; // Round's number
private int me; // Process ID
private boolean[] weakSetted;
private boolean[] strongSetted;
private byte[][] weak; // weakling accepted values from other processes
private byte[][] strong; // strongly accepted values from other processes
private byte[][] decide; // values decided by other processes
private Collection<Integer> freeze = null; // processes where this round was freezed
private boolean frozen = false; // is this round frozen?
private boolean collected = false; // indicates if a collect message for this round was already sent
//private long timeout; // duration of the timeout
private boolean alreadyRemoved = false; // indicates if this round was removed from its execution
public byte[] propValue = null; // proposed value
......@@ -60,7 +59,7 @@ public class Round implements Serializable {
* @param number Number of the round
* @param timeout Timeout duration for this round
*/
protected Round(ReconfigurationManager manager, Execution parent, int number, long timeout) {
protected Round(ReconfigurationManager manager, Execution parent, int number) {
this.execution = parent;
this.number = number;
this.manager = manager;
......@@ -80,22 +79,15 @@ public class Round implements Serializable {
if (number == 0) {
this.weak = new byte[n][];
this.strong = new byte[n][];
this.decide = new byte[n][];
Arrays.fill((Object[]) weak, null);
Arrays.fill((Object[]) strong, null);
Arrays.fill((Object[]) decide, null);
} else {
Round previousRound = execution.getRound(number - 1, manager);
this.weak = previousRound.getWeak();
this.strong = previousRound.getStrong();
this.decide = previousRound.getDecide();
}
//define the timeout for this round (not used anymore)
//this.timeout = (int) Math.pow(2, number) * timeout;
//execution.getManager().getAcceptor().scheduleTimeout(this);
}
/**
......@@ -153,22 +145,6 @@ public class Round implements Serializable {
return execution;
}
/**
* Sets the timeout associated with this round
* @param timeoutTask The timeout associated with this round
*/
/*public void setTimeoutTask(TimeoutTask timeoutTask) {
this.timeoutTask = timeoutTask;
}*/
/**
* Retrieves the timeout associated with this round
* @param timeoutTask The timeout associated with this round
*/
/*public TimeoutTask getTimeoutTask() {
return timeoutTask;
}*/
/**
* Informs if there is a weakly accepted value from a replica
* @param acceptor The replica ID
......@@ -189,17 +165,6 @@ public class Round implements Serializable {
//******* EDUARDO END **************//
}
/**
* Informs if there is a decided value from a replica
* @param acceptor The replica ID
* @return True if there is a decided value from a replica, false otherwise
*/
public boolean isDecideSetted(int acceptor) {
//******* EDUARDO BEGIN **************//
return decide[this.manager.getCurrentViewPos(acceptor)] != null;
//******* EDUARDO END **************//
}
/**
* Retrives the weakly accepted value from the specified replica
* @param acceptor The replica ID
......@@ -268,36 +233,6 @@ public class Round implements Serializable {
//******* EDUARDO END **************//
}
/**
* Retrieves the decided value by the specified replica
* @param acceptor The replica ID
* @return The value decided by the specified replica
*/
public byte[] getDecide(int acceptor) {
//******* EDUARDO BEGIN **************//
return decide[this.manager.getCurrentViewPos(acceptor)];
//******* EDUARDO END **************//
}
/**
* Retrives all the decided values by all replicas
* @return The values decided by all replicas
*/
public byte[][] getDecide() {
return decide;
}
/**
* Sets the decided value by the specified replica
* @param acceptor The replica ID
* @param value The value decided by the specified replica
*/
public void setDecide(int acceptor, byte[] value) {
//******* EDUARDO BEGIN **************//
decide[this.manager.getCurrentViewPos(acceptor)] = value;
//******* EDUARDO END **************//
}
/**
* Indicates if a collect message for this round was already sent
* @return True if done so, false otherwise
......@@ -366,15 +301,6 @@ public class Round implements Serializable {
return count(strongSetted,strong, value);
}
/**
* Retrives the ammount of replicas that decided a specified value
* @param value The value in question
* @return Ammount of replicas that decided the specified value
*/
public int countDecide(byte[] value) {
return count(null,decide, value);
}
/**
* Counts how many times 'value' occurs in 'array'
* @param array Array where to count
......@@ -413,12 +339,10 @@ public class Round implements Serializable {
for (int i = 0; i < weak.length - 1; i++) {
buffWeak.append(str(weak[i]) + " [" + (weak[i] != null ? weak[i].length : 0) + " bytes] ,");
buffStrong.append(str(strong[i]) + " [" + (strong[i] != null ? strong[i].length : 0) + " bytes] ,");
buffDecide.append(str(decide[i]) + " [" + (decide[i] != null ? decide[i].length : 0) + " bytes] ,");
}
buffWeak.append(str(weak[weak.length - 1]) + " [" + (weak[weak.length - 1] != null ? weak[weak.length - 1].length : 0) + " bytes])");
buffStrong.append(str(strong[strong.length - 1]) + " [" + (strong[strong.length - 1] != null ? strong[strong.length - 1].length : 0) + " bytes])");
buffDecide.append(str(decide[decide.length - 1]) + " [" + (decide[decide.length - 1] != null ? decide[decide.length - 1].length : 0) + " bytes])");
return "eid=" + execution.getId() + " r=" + getNumber() + " " + buffWeak + " " + buffStrong + " " + buffDecide;
}
......@@ -438,7 +362,7 @@ public class Round implements Serializable {
}
/**
* Limpa toda a informacao que tem sobre o consenso (proposes, weaks, strons e decides)
* Clear all round info.
*/
public void clear() {
......@@ -452,10 +376,8 @@ public class Round implements Serializable {
this.weak = new byte[n][];
this.strong = new byte[n][];
this.decide = new byte[n][];
Arrays.fill((Object[]) weak, null);
Arrays.fill((Object[]) strong, null);
Arrays.fill((Object[]) decide, null);
}
}
......@@ -28,7 +28,6 @@ public class TOMConfiguration extends Configuration {
protected int n;
protected int f;
protected int requestTimeout;
protected int freezeInitialTimeout;
protected int tomPeriod;
protected int paxosHighMark;
protected int revivalHighMark;
......@@ -37,20 +36,18 @@ public class TOMConfiguration extends Configuration {
protected int numberOfNonces;
protected int inQueueSize;
protected int outQueueSize;
protected boolean decideMessagesEnabled;
protected boolean shutdownHookEnabled;
protected boolean verifyTimestamps;
protected boolean useSenderThread;
protected RSAKeyLoader rsaLoader;
protected Logger log;
protected int clientServerCommSystem;
private int maxMessageSize;
private int debug;
private int numNIOThreads;
private int commBuffering;
private int useMACs;
private int useSignatures;
private boolean stateTransferEnabled;
private int checkpointPeriod;
private int useControlFlow;
......@@ -84,26 +81,8 @@ public class TOMConfiguration extends Configuration {
f = Integer.parseInt(s);
}
s = (String) configs.remove("system.paxos.freeze.timeout");
if (s == null) {
freezeInitialTimeout = n * 10;
} else {
freezeInitialTimeout = Integer.parseInt(s);
}
s = (String) configs.remove("system.paxos.decideMessages");
if (s == null) {
decideMessagesEnabled = false;
} else {
decideMessagesEnabled = Boolean.parseBoolean(s);
}
s = (String) configs.remove("system.totalordermulticast.timeout");
if (s == null) {
requestTimeout = freezeInitialTimeout / 2;
} else {
requestTimeout = Integer.parseInt(s);
}
s = (String) configs.remove("system.shutdownhook");
shutdownHookEnabled = (s != null)? Boolean.parseBoolean(s): false;
s = (String) configs.remove("system.totalordermulticast.period");
if (s == null) {
......@@ -139,13 +118,6 @@ public class TOMConfiguration extends Configuration {
maxBatchSize = Integer.parseInt(s);
}
s = (String) configs.remove("system.totalordermulticast.maxMessageSize");
if (s == null) {
maxMessageSize = 200; //the same as used in upright
} else {
maxMessageSize = Integer.parseInt(s);
}
s = (String) configs.remove("system.debug");
if (s == null) {
Logger.debug = false;
......@@ -185,14 +157,6 @@ public class TOMConfiguration extends Configuration {
useSenderThread = Boolean.parseBoolean(s);
}
s = (String) configs.remove("system.communication.clientServerCommSystem");
if (s == null) {
clientServerCommSystem = 1;
} else {
clientServerCommSystem = Integer.parseInt(s);
}
s = (String) configs.remove("system.communication.numNIOThreads");
if (s == null) {
numNIOThreads = 2;
......@@ -200,13 +164,6 @@ public class TOMConfiguration extends Configuration {
numNIOThreads = Integer.parseInt(s);
}
s = (String) configs.remove("system.communication.commBuffering");
if (s == null) {
commBuffering = 0;
} else {
commBuffering = Integer.parseInt(s);
}
s = (String) configs.remove("system.communication.useMACs");
if (s == null) {
useMACs = 0;
......@@ -279,19 +236,16 @@ public class TOMConfiguration extends Configuration {
if (s == null) {
outQueueSize = 1000;
} else {
outQueueSize = Integer.parseInt(s);
if (outQueueSize < 1) {
outQueueSize = 1000;
}
}
rsaLoader = new RSAKeyLoader(this, TOMConfiguration.configHome);
} catch (Exception e) {
System.err.println("Wrong system.config file format.");
e.printStackTrace();
e.printStackTrace(System.err);
}
}
......@@ -308,10 +262,6 @@ public class TOMConfiguration extends Configuration {
return ttpId;
}
public int getMaxMessageSize() {
return maxMessageSize;
}
public int getRequestTimeout() {
return requestTimeout;
}
......@@ -328,14 +278,6 @@ public class TOMConfiguration extends Configuration {
return f;
}
public int getTOMPeriod() {
return tomPeriod;
}
public int getFreezeInitialTimeout() {
return freezeInitialTimeout;
}
public int getPaxosHighMark() {
return paxosHighMark;
}
......@@ -348,8 +290,8 @@ public class TOMConfiguration extends Configuration {
return maxBatchSize;
}
public boolean isDecideMessagesEnabled() {
return decideMessagesEnabled;
public boolean isShutdownHookEnabled() {
return shutdownHookEnabled;
}
public boolean isStateTransferEnabled() {
......@@ -372,14 +314,6 @@ public class TOMConfiguration extends Configuration {
return useSenderThread;
}
/**
*
* @return 0 (Netty), 1 (MINA)
*/
public int clientServerCommSystem() {
return clientServerCommSystem;
}
/**
* *
*/
......@@ -392,13 +326,6 @@ public class TOMConfiguration extends Configuration {
return numberOfNonces;
}
/**
* Number of requests from clients buffered by the client communication system before delivering to the TOM Layer
*/
public int getCommBuffering() {
return commBuffering;
}
/**
* Indicates if signatures should be used (1) or not (0) to authenticate client requests
*/
......@@ -427,38 +354,21 @@ public class TOMConfiguration extends Configuration {
return useControlFlow;
}
/* public PublicKey[] getRSAServersPublicKeys() {
try {
return rsaLoader.loadServersPublicKeys();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}*/
public PublicKey getRSAPublicKey(int id) {
try {
return rsaLoader.loadPublicKey(id);
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.err);
return null;
}
}
/* public void increasePortNumber() {
for (int i = 0; i < getN(); i++) {
hosts.setPort(i, hosts.getPort(i) + 1);
}
}
*/
public PrivateKey getRSAPrivateKey() {
try {
return rsaLoader.loadPrivateKey();
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.err);
return null;
}
}
......
......@@ -26,7 +26,7 @@ import navigators.smart.paxosatwar.roles.Acceptor;
import navigators.smart.paxosatwar.roles.Proposer;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.util.ShutdownThread;
import navigators.smart.tom.util.ShutdownHookThread;
/**
* This class is used to
......@@ -36,19 +36,18 @@ import navigators.smart.tom.util.ShutdownThread;
public abstract class TOMReceiver implements TOMRequestReceiver {
private boolean tomStackCreated = false;
private ReplicaContext replicaCtx = null;
public void init(ServerCommunicationSystem cs, ReconfigurationManager reconfManager) {
this.init(cs, reconfManager,-1,-1);
}
/**
* This metohd initializes the object
* This method initializes the object
*
* @param cs Server side comunication System
* @param cs Server side communication System
* @param conf Total order messaging configuration
*/
//public void init(ServerCommunicationSystem cs, TOMConfiguration conf) {
public void init(ServerCommunicationSystem cs, ReconfigurationManager reconfManager,
int lastExec, int lastLeader) {
if (tomStackCreated) { // if this object was already initialized, don't do it again
......@@ -56,19 +55,11 @@ public abstract class TOMReceiver implements TOMRequestReceiver {
}
//******* EDUARDO BEGIN **************//
// Get group of replicas
//int[] group = new int[conf.getN()];
//for (int i = 0; i < group.length; i++) {
// group[i] = i;
//}
int me = reconfManager.getStaticConf().getProcessId(); // this process ID
if (!reconfManager.isInCurrentView()) {
throw new RuntimeException("I'm not an acceptor!");
}
//******* EDUARDO END **************//
// Assemble the total order messaging layer
......@@ -82,7 +73,8 @@ public abstract class TOMReceiver implements TOMRequestReceiver {
Proposer proposer = new Proposer(cs, messageFactory, reconfManager);
ExecutionManager manager = new ExecutionManager(reconfManager, acceptor, proposer,
me, reconfManager.getStaticConf().getFreezeInitialTimeout());
me);
acceptor.setManager(manager);
proposer.setManager(manager);
......@@ -99,9 +91,22 @@ public abstract class TOMReceiver implements TOMRequestReceiver {
acceptor.setTOMLayer(tomLayer);
Runtime.getRuntime().addShutdownHook(new ShutdownThread(cs,lm,acceptor,manager,tomLayer));
if(reconfManager.getStaticConf().isShutdownHookEnabled()){
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(cs,lm,acceptor,manager,tomLayer));
}
tomLayer.start(); // start the layer execution
tomStackCreated = true;
replicaCtx = new ReplicaContext(cs, reconfManager);
}
}
/**
* Obtains the current replica context (getting access to several information
* and capabilities of the replication engine).
*
* @return this replica context
*/
public final ReplicaContext getReplicaContext() {
return replicaCtx;
}
}
......@@ -57,7 +57,7 @@ import navigators.smart.tom.TOMRequestReceiver;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.core.messages.TOMMessageType;
import navigators.smart.tom.core.timer.RequestsTimer;
import navigators.smart.tom.core.timer.messages.ForwardedMessage;
import navigators.smart.tom.core.timer.ForwardedMessage;
import navigators.smart.tom.util.BatchBuilder;
import navigators.smart.tom.util.BatchReader;
import navigators.smart.tom.util.Logger;
......@@ -158,7 +158,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
try {
this.engine = Signature.getInstance("SHA1withRSA");
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.err);
}
this.prk = reconfManager.getStaticConf().getRSAPrivateKey();
......@@ -180,7 +180,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
/**
* Computes an hash for a TOM message
* @param message
* @return Hash for teh specified TOM message
* @return Hash for the specified TOM message
*/
public final byte[] computeHash(byte[] data) {
byte[] ret = null;
......@@ -1102,13 +1102,11 @@ public final class TOMLayer extends Thread implements RequestReceiver {
if (ois.readBoolean()) { // conteudo do ultimo eid decidido
last = ois.readInt();
lastValue = (byte[]) ois.readObject();
//TODO: Falta a prova!
}
lastData = new LastEidData(msg.getSender(), last, lastValue, null);
......@@ -1142,12 +1140,10 @@ public final class TOMLayer extends Thread implements RequestReceiver {
catch_up(ts);
}
} catch (IOException ex) {
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
ex.printStackTrace(System.err);
} catch (ClassNotFoundException ex) {
java.util.logging.Logger.getLogger(TOMLayer.class.getName()).log(Level.SEVERE, null, ex);
ex.printStackTrace(System.err);
}
}
......@@ -1304,7 +1300,6 @@ public final class TOMLayer extends Thread implements RequestReceiver {
r.propValueHash = hash;
r.propValue = propose;
r.deserializedPropValue = checkProposedValue(propose);
r.setDecide(me, hash);
exec.decided(r, hash); // entregar a decisao a delivery thread
}
byte[] tmpval = null;
......
......@@ -16,7 +16,7 @@
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.tom.core.timer.messages;
package navigators.smart.tom.core.timer;
import java.io.IOException;
import java.io.ObjectInput;
......
......@@ -29,7 +29,7 @@ import navigators.smart.tom.core.TOMLayer;
* Print information about the replica when it is shutdown.
*
*/
public class ShutdownThread extends Thread {
public class ShutdownHookThread extends Thread {
private ServerCommunicationSystem scs;
private LeaderModule lm;
......@@ -37,7 +37,7 @@ public class ShutdownThread extends Thread {
private ExecutionManager manager;
private TOMLayer tomLayer;
public ShutdownThread(ServerCommunicationSystem scs, LeaderModule lm,
public ShutdownHookThread(ServerCommunicationSystem scs, LeaderModule lm,
Acceptor acceptor, ExecutionManager manager, TOMLayer tomLayer) {
this.scs = scs;
this.lm = lm;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册