提交 65de0915 编写于 作者: B bessani@gmail.com

HUGE UPDATE!!!

- Improvements on the performance of communication system
- big clean up of the code (many old methods deleted)
- corrected bug in out of context (now, a consensus is only started if a propose is received)
Note: still need to verify if state transfer and leader change works right
上级 6da808a0
......@@ -222,7 +222,8 @@ public class ClientsManager {
public boolean requestReceived(TOMMessage request, boolean fromClient,
boolean storeMessage, ServerCommunicationSystem cs) {
request.receptionTime = System.currentTimeMillis();
request.receptionTime = System.nanoTime();
int clientId = request.getSender();
boolean accounted = false;
......
......@@ -18,21 +18,13 @@
package navigators.smart.communication;
import static navigators.smart.communication.ServerCommunicationSystem.RR_MSG;
import static navigators.smart.communication.ServerCommunicationSystem.RT_MSG;
import static navigators.smart.communication.ServerCommunicationSystem.TOM_REPLY_MSG;
import static navigators.smart.communication.ServerCommunicationSystem.TOM_REQUEST_MSG;
import static navigators.smart.paxosatwar.messages.MessageFactory.COLLECT;
import java.io.ObjectOutputStream;
import navigators.smart.paxosatwar.messages.PaxosMessage;
import navigators.smart.paxosatwar.roles.Acceptor;
import navigators.smart.paxosatwar.roles.Proposer;
import navigators.smart.statemanagment.SMMessage;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.core.timer.messages.ForwardedMessage;
import navigators.smart.tom.core.timer.messages.RTMessage;
......@@ -66,15 +58,8 @@ public class MessageHandler {
protected void processData(SystemMessage sm) {
if (sm instanceof PaxosMessage) {
PaxosMessage paxosMsg = (PaxosMessage) sm;
//Logger.println("(MessageHandler.processData) PAXOS_MSG received: " + paxosMsg);
if (paxosMsg.getPaxosType() == COLLECT) {
//the proposer layer only handle COLLECT messages
Logger.println("(MessageHandler.processData) delivering COLLECT message");
proposer.deliver(paxosMsg);
} else {
Logger.println("(MessageHandler.processData) delivering a paxos message");
acceptor.deliver(paxosMsg);
}
Logger.println("(MessageHandler.processData) delivering a paxos message");
acceptor.deliver(paxosMsg);
} else if (sm instanceof RTMessage) {
RTMessage rtMsg = (RTMessage) sm;
Logger.println("(MessageHandler.processData) RT_MSG received: " + rtMsg + " (replica " + rtMsg.getSender() + ")");
......@@ -95,82 +80,14 @@ public class MessageHandler {
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
} else if (sm instanceof SMMessage) {
Logger.println("(MessageHandler.processData) receiving a state managment message from replica " + sm.getSender());
SMMessage smsg = (SMMessage) sm;
if (smsg.getType() == TOMUtil.SM_REQUEST) {
tomLayer.SMRequestDeliver(smsg);
}
else {
} else {
tomLayer.SMReplyDeliver(smsg);
}
/******************************************************************/
}
}
protected void getData(SystemMessage msg, int type, ObjectOutputStream obOut) throws Exception {
if (type == TOM_REQUEST_MSG || type == TOM_REPLY_MSG) {
getBytes((TOMMessage) msg, obOut);
} else if (type == RR_MSG || type == RT_MSG) {
obOut.writeObject(msg);
} else {//if (type == PAXOS_MSG){
getBytes((PaxosMessage) msg, obOut);
}
}
//******* EDUARDO BEGIN **************//
//Nao estava sendo usado pra nada!
/* public byte[] getData(SystemMessage msg) {
ByteArrayOutputStream bOut = new ByteArrayOutputStream();
try {
ObjectOutputStream obOut = new ObjectOutputStream(bOut);
obOut.writeObject(msg);
} catch (IOException ex) {
ex.printStackTrace();
}
return bOut.toByteArray();
}*/
//******* EDUARDO END **************//
//utility methods to convert PaxosMessages to bytes and vice-versa
private void getBytes(PaxosMessage msg, ObjectOutputStream obOut) throws Exception {
obOut.writeInt(msg.getNumber());
obOut.writeInt(msg.getRound());
obOut.writeInt(msg.getSender());
obOut.writeInt(msg.getPaxosType());
obOut.writeObject(msg.getValue());
obOut.writeObject(msg.getProof());
}
/*
private PaxosMessage getPaxosMsg(ObjectInputStream obIn) throws Exception {
int number = obIn.readInt();
int round = obIn.readInt();
int from = obIn.readInt();
int paxosT = obIn.readInt();
Object value = obIn.readObject();
Object proof = obIn.readObject();
return new PaxosMessage(paxosT, number, round, from, value, proof);
}
*/
//utility methods to convert TOMMessage to bytes and vice-versa
private void getBytes(TOMMessage msg, ObjectOutputStream obOut) throws Exception {
obOut.writeInt(msg.getSender());
obOut.writeInt(msg.getSequence());
obOut.writeObject(msg.getContent());
}
/*
private TOMMessage getTOMMsg(ObjectInputStream obIn) throws Exception {
int sender = obIn.readInt();
int sequence = obIn.readInt();
Object content = obIn.readObject();
return new TOMMessage(sender, sequence, content, TOM_REQUEST_MSG);
}
*/
}
......@@ -30,7 +30,6 @@ import navigators.smart.paxosatwar.roles.Proposer;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.Logger;
......@@ -44,10 +43,6 @@ public class ServerCommunicationSystem extends Thread {
public static int TOM_REQUEST_MSG = 1;
public static int TOM_REPLY_MSG = 2;
public static int PAXOS_MSG = 3;
public static int RR_MSG = 4;
public static int RT_MSG = 5;
//public static int IN_QUEUE_SIZE = 200;
private LinkedBlockingQueue<SystemMessage> inQueue = null;//new LinkedBlockingQueue<SystemMessage>(IN_QUEUE_SIZE);
......@@ -131,11 +126,9 @@ public class ServerCommunicationSystem extends Thread {
if (count % 1000==0)
Logger.println("(ServerCommunicationSystem.run) After "+count+" messages, inQueue size="+inQueue.size());
Logger.println("(ServerCommunicationSystem.run) Taking a message form the queue and processing it");
SystemMessage sm = inQueue.take();
Logger.println("(ServerCommunicationSystem.run) The message received is from replica " + sm.getSender());
messageHandler.processData(sm);
//System.out.println("Entregou uma msgssssssssss");
} catch (InterruptedException ex) {
java.util.logging.Logger.getLogger(ServerCommunicationSystem.class.getName()).log(Level.SEVERE, null, ex);
}
......
......@@ -16,7 +16,7 @@
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.tom.core.messages;
package navigators.smart.communication;
import java.io.Externalizable;
import java.io.IOException;
......
......@@ -26,7 +26,7 @@ import navigators.smart.tom.core.messages.TOMMessage;
* @author Paulo
*/
public interface CommunicationSystemClientSide {
public void send(boolean sign, int[] targets, TOMMessage sm, boolean serializeClassHeaders);
public void send(boolean sign, int[] targets, TOMMessage sm);
public void setReplyReceiver(ReplyReceiver trr);
public void sign(TOMMessage sm);
public void close();
......
......@@ -20,7 +20,6 @@ package navigators.smart.communication.client.netty;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
......@@ -30,6 +29,8 @@ import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
......@@ -149,16 +150,13 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
}
}
//TODO: Falta fechar as conexoes para servidores q sairam
@Override
public void updateConnections() {
try {
//******* EDUARDO BEGIN **************//
Mac macDummy = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
int[] currV = manager.getCurrentViewProcesses();
//******* EDUARDO END **************//
//open connections with new servers
for (int i = 0; i < currV.length; i++) {
rl.readLock().lock();
if (sessionTable.get(currV[i]) == null) {
......@@ -177,7 +175,10 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
bootstrap.setOption("connectTimeoutMillis", 10000);
// Set up the default event pipeline.
bootstrap.setPipelineFactory(new NettyClientPipelineFactory(this, true, sessionTable, authKey, macDummy.getMacLength(), manager, rl, signatureLength, new ReentrantLock()));
bootstrap.setPipelineFactory(
new NettyClientPipelineFactory(this, true, sessionTable,
authKey, macDummy.getMacLength(), manager, rl, signatureLength,
new ReentrantLock()));
//******* EDUARDO BEGIN **************//
......@@ -202,6 +203,28 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
rl.writeLock().unlock();
} else rl.readLock().unlock();
}
//close connections with removed servers
//ANB: This code need to be tested!!!
ListIterator ids = new LinkedList(sessionTable.keySet()).listIterator();
while (ids.hasNext()) {
int id = (Integer) ids.next();
boolean found = false;
for (int v : currV) {
if (v == id) {
found = true;
break;
}
}
if (!found) {
NettyClientServerSession cs =
(NettyClientServerSession) sessionTable.remove(id);
cs.getChannel().close();
}
}
} catch (NoSuchAlgorithmException ex) {
}
}
......@@ -296,71 +319,40 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
}
@Override
public void send(boolean sign, int[] targets, TOMMessage sm, boolean serializeClassHeaders) {
public void send(boolean sign, int[] targets, TOMMessage sm) {
if (sm.serializedMessage == null) {
//serialize message
DataOutputStream dos = null;
ObjectOutputStream oos = null;
byte[] data = null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
if (!serializeClassHeaders) {
dos = new DataOutputStream(baos);
sm.wExternal(dos);
dos.flush();
sm.includesClassHeader = false;
} else {
oos = new ObjectOutputStream(baos);
oos.writeObject(sm);
oos.flush();
sm.includesClassHeader = true;
}
data = baos.toByteArray();
sm.serializedMessage = data;
dos = new DataOutputStream(baos);
sm.wExternal(dos);
dos.flush();
sm.serializedMessage = baos.toByteArray();
} catch (IOException ex) {
Logger.println("Impossible to serialize message: " + sm);
} finally {
try {
if (dos != null) {
dos.close();
}
if (oos != null) {
oos.close();
}
} catch (IOException ex) {
}
dos.close();
} catch (IOException ex) { }
}
} else {
sm.includesClassHeader = false;
}
//Logger.println("Sending message with "+sm.serializedMessage.length+" bytes of content.");
//produce signature
if (sm.serializedMessageSignature == null && sign) {
//******* EDUARDO BEGIN **************//
byte[] data2 = signMessage(manager.getStaticConf().getRSAPrivateKey(), sm.serializedMessage);
//******* EDUARDO END **************//
sm.serializedMessageSignature = data2;
if (sign && sm.serializedMessageSignature == null) {
sm.serializedMessageSignature = signMessage(
manager.getStaticConf().getRSAPrivateKey(), sm.serializedMessage);
}
int sent = 0;
for (int i = targets.length - 1; i >= 0; i--) {
/**********************************************************/
/********************MALICIOUS CODE************************/
/**********************************************************/
//don't send the message to server 0 if my id is 5
/*
if (conf.getProcessId() == 5 && (i == 0)) {
continue;
}
*/
/**********************************************************/
/**********************************************************/
/**********************************************************/
sm.destination = targets[i];
rl.readLock().lock();
Channel channel = (Channel) ((NettyClientServerSession) sessionTable.get(targets[i])).getChannel();
Channel channel = ((NettyClientServerSession) sessionTable.get(targets[i])).getChannel();
rl.readLock().unlock();
if (channel.isConnected()) {
sm.signed = sign;
......@@ -371,8 +363,9 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
}
}
if (sent < manager.getCurrentViewF() + 1) {
//if less than f+1 servers are connected send an exception to the client
throw new RuntimeException("Impossible to connect to servers!");
}
}
......@@ -392,8 +385,7 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
} finally {
try {
dos.close();
} catch (IOException ex) {
}
} catch (IOException ex) { }
}
//******* EDUARDO BEGIN **************//
......
......@@ -15,13 +15,11 @@
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.communication.client.netty;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.security.NoSuchAlgorithmException;
import java.security.spec.InvalidKeySpecException;
......@@ -36,7 +34,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
......@@ -45,9 +42,9 @@ import javax.crypto.spec.PBEKeySpec;
import navigators.smart.communication.client.CommunicationSystemServerSide;
import navigators.smart.communication.client.RequestReceiver;
import navigators.smart.communication.server.ServerConnection;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;
import org.jboss.netty.bootstrap.ServerBootstrap;
......@@ -61,31 +58,20 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/**
*
* @author Paulo
*/
@ChannelPipelineCoverage("all")
public class NettyClientServerCommunicationSystemServerSide extends SimpleChannelHandler implements CommunicationSystemServerSide {
/**
* number of measures used to calculate statistics
*/
//private final int BENCHMARK_PERIOD = 100000;
public class NettyClientServerCommunicationSystemServerSide extends SimpleChannelHandler implements CommunicationSystemServerSide {
private static final String PASSWORD = "newcs";
private RequestReceiver requestReceiver;
private HashMap sessionTable;
private ReentrantReadWriteLock rl;
private SecretKey authKey;
//private long numReceivedMsgs = 0;
//private long lastMeasurementStart = 0;
//private long max=0;
private List<TOMMessage> requestsReceived = Collections.synchronizedList(new ArrayList<TOMMessage>());
private ReentrantLock lock = new ReentrantLock();
private ReconfigurationManager manager;
public NettyClientServerCommunicationSystemServerSide(ReconfigurationManager manager) {
......@@ -101,16 +87,9 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
//Configure the server.
/* Cached thread pool */
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
/* Fixed thread pool
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newFixedThreadPool(conf.getNumberOfNIOThreads()),
Executors.newFixedThreadPool(conf.getNumberOfNIOThreads())));
*/
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
//******* EDUARDO BEGIN **************//
Mac macDummy = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
......@@ -122,26 +101,24 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
bootstrap.setOption("child.keepAlive", true);
//Set up the default event pipeline.
bootstrap.setPipelineFactory(new NettyServerPipelineFactory(this,false,sessionTable,authKey,macDummy.getMacLength(),manager,rl,TOMUtil.getSignatureSize(manager), new ReentrantLock() ));
bootstrap.setPipelineFactory(new NettyServerPipelineFactory(this, false, sessionTable, authKey, macDummy.getMacLength(), manager, rl, TOMUtil.getSignatureSize(manager), new ReentrantLock()));
//Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress( manager.getStaticConf().getHost(
manager.getStaticConf().getProcessId()),
manager.getStaticConf().getPort( manager.getStaticConf().getProcessId())));
bootstrap.bind(new InetSocketAddress(manager.getStaticConf().getHost(
manager.getStaticConf().getProcessId()),
manager.getStaticConf().getPort(manager.getStaticConf().getProcessId())));
System.out.println("#Bound to port " + manager.getStaticConf().getPort(manager.getStaticConf().getProcessId()));
System.out.println("#myId " + manager.getStaticConf().getProcessId());
System.out.println("#n " + manager.getCurrentViewN());
System.out.println("#f " + manager.getCurrentViewF());
System.out.println("#requestTimeout= " + manager.getStaticConf().getRequestTimeout());
System.out.println("#maxBatch= " + manager.getStaticConf().getMaxBatchSize());
System.out.println("#Using MACs = " + manager.getStaticConf().getUseMACs());
System.out.println("#requestTimeout= " + manager.getStaticConf().getRequestTimeout());
System.out.println("#maxBatch= " + manager.getStaticConf().getMaxBatchSize());
System.out.println("#Using MACs = " + manager.getStaticConf().getUseMACs());
System.out.println("#Using Signatures = " + manager.getStaticConf().getUseSignatures());
//******* EDUARDO END **************//
//******* EDUARDO END **************//
} catch (InvalidKeySpecException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
} catch (NoSuchAlgorithmException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
}
}
......@@ -159,7 +136,7 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
TOMMessage sm = (TOMMessage) e.getMessage();
//******* EDUARDO BEGIN **************//
if (manager.getStaticConf().getCommBuffering()>0) {
if (manager.getStaticConf().getCommBuffering() > 0) {
lock.lock();
requestsReceived.add(sm);
if (requestsReceived.size()>= manager.getStaticConf().getCommBuffering()){
......@@ -172,78 +149,20 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
requestsReceived = new ArrayList<TOMMessage>();
}
lock.unlock();
}
else {
} else {
//delivers message to TOMLayer
requestReceiver.requestReceived(sm);
}
/*obtains and unlocks client lock (that guarantees one thread per client)
rl.readLock().lock();
Lock clientLock = ((NettyClientServerSession)sessionTable.get(sm.getSender())).getLock();
int lastMsgReceived = ((NettyClientServerSession)sessionTable.get(sm.getSender())).getLastMsgReceived();
if (sm.getSequence() != lastMsgReceived+1)
System.out.println("(Netty message received) WARNING: Received request "+sm+" but last message received was "+lastMsgReceived);
((NettyClientServerSession)sessionTable.get(sm.getSender())).setLastMsgReceived(sm.getSequence());
rl.readLock().unlock();
clientLock.unlock();
*/
/*
lock.lock();
numReceivedMsgs++;
if (numReceivedMsgs == 1) {
lastMeasurementStart = System.currentTimeMillis();
} else if (numReceivedMsgs==BENCHMARK_PERIOD) {
long elapsedTime = System.currentTimeMillis() - lastMeasurementStart;
double opsPerSec_ = ((double)BENCHMARK_PERIOD)/(elapsedTime/1000.0);
long opsPerSec = Math.round(opsPerSec_);
if (opsPerSec>max)
max = opsPerSec;
System.out.println("(Netty messageReceived) Last "+BENCHMARK_PERIOD+" NETTY messages were received at a rate of " + opsPerSec + " msgs per second");
System.out.println("(Netty messageReceived) Max NETTY through. until now: " + max + " ops per second");
System.out.println("(Netty messageReceived) Active clients: " + sessionTable.size());
numReceivedMsgs = 0;
}
lock.unlock();
*/
}
/*
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
//System.out.println("Message sent");
TOMMessage sm = (TOMMessage) e.getMessage();
long duration = System.nanoTime() - (Long)session.getAttribute("startInstant");
int counter = (Integer) session.getAttribute("msgCount");
session.setAttribute("msgCount",++counter);
Storage st = (Storage) session.getAttribute("storage");
if (counter>benchmarkPeriod/2){
st.store(duration);
session.setAttribute("storage",st);
}
if (st.getCount()==benchmarkPeriod/2){
System.out.println("TOM delay: Average time for "+benchmarkPeriod/2+" executions (-10%) = "+st.getAverage(true)/1000+ " us ");
System.out.println("TOM delay: Standard desviation for "+benchmarkPeriod/2+" executions (-10%) = "+st.getDP(true)/1000 + " us ");
System.out.println("TOM delay: Average time for "+benchmarkPeriod/2+" executions (all samples) = "+st.getAverage(false)/1000+ " us ");
System.out.println("TOM delay: Standard desviation for "+benchmarkPeriod/2+" executions (all samples) = "+st.getDP(false)/1000 + " us ");
System.out.println("TOM delay: Maximum time for "+benchmarkPeriod/2+" executions (-10%) = "+st.getMax(true)/1000+ " us ");
System.out.println("TOM delay: Maximum time for "+benchmarkPeriod/2+" executions (all samples) = "+st.getMax(false)/1000+ " us ");
st = new Storage(benchmarkPeriod/2);
session.setAttribute("storage",st);
session.setAttribute("msgCount",0);
}
}
*/
@Override
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) {
navigators.smart.tom.util.Logger.println("Session Created, active clients="+sessionTable.size());
//session.setAttribute("storage",st);
//session.setAttribute("msgCount",0);
navigators.smart.tom.util.Logger.println("Session Created, active clients=" + sessionTable.size());
}
@Override
public void channelClosed(
public void channelClosed(
ChannelHandlerContext ctx, ChannelStateEvent e) {
rl.writeLock().lock();
//removes session from sessionTable
......@@ -256,83 +175,65 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
int key = (Integer) m.getKey();
sessionTable.remove(key);
System.out.println("#Removed client channel with ID= " + key);
System.out.println("#active clients="+sessionTable.size());
System.out.println("#active clients=" + sessionTable.size());
break;
}
}
rl.writeLock().unlock();
navigators.smart.tom.util.Logger.println("Session Closed, active clients="+sessionTable.size());
navigators.smart.tom.util.Logger.println("Session Closed, active clients=" + sessionTable.size());
}
public void setRequestReceiver(RequestReceiver tl) {
this.requestReceiver = tl;
}
@Override
public void send(int[] targets, TOMMessage sm, boolean serializeClassHeaders) {
//serialize message
DataOutputStream dos = null;
ObjectOutputStream oos = null;
//serialize message
DataOutputStream dos = null;
byte[] data = null;
byte[] data = null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
dos = new DataOutputStream(baos);
sm.wExternal(dos);
dos.flush();
data = baos.toByteArray();
sm.serializedMessage = data;
} catch (IOException ex) {
Logger.println("Error enconding message.");
} finally {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
if (!serializeClassHeaders) {
dos = new DataOutputStream(baos);
sm.wExternal(dos);
dos.flush();
sm.includesClassHeader = false;
}
else {
oos = new ObjectOutputStream(baos);
oos.writeObject(sm);
oos.flush();
sm.includesClassHeader = true;
}
data = baos.toByteArray();
sm.serializedMessage = data;
} catch (IOException ex) {
Logger.getLogger(NettyClientServerCommunicationSystemClientSide.class.getName()).log(Level.SEVERE, null, ex);
} finally {
try {
if (dos != null) {
dos.close();
}
if (oos != null) {
oos.close();
}
} catch (IOException ex) {
Logger.getLogger(NettyClientServerCommunicationSystemClientSide.class.getName()).log(Level.SEVERE, null, ex);
}
}
dos.close();
} catch (IOException ex) {}
}
//replies are not signed in the current JBP version
sm.signed = false;
//produce signature if necessary (never in the current version)
if (sm.signed){
if (sm.signed) {
//******* EDUARDO BEGIN **************//
byte[] data2 = TOMUtil.signMessage( manager.getStaticConf().getRSAPrivateKey(), data);
byte[] data2 = TOMUtil.signMessage(manager.getStaticConf().getRSAPrivateKey(), data);
//******* EDUARDO END **************//
sm.serializedMessageSignature = data2;
}
for (int i = 0; i < targets.length; i++) {
rl.readLock().lock();
NettyClientServerSession ncss = (NettyClientServerSession)sessionTable.get(targets[i]);
if (ncss!=null){
NettyClientServerSession ncss = (NettyClientServerSession) sessionTable.get(targets[i]);
if (ncss != null) {
Channel session = ncss.getChannel();
rl.readLock().unlock();
sm.destination = targets[i];
//send message
ChannelFuture f = session.write(sm);
try {
f.await();
session.write(sm).await();
} catch (InterruptedException ex) {
Logger.getLogger(NettyClientServerCommunicationSystemServerSide.class.getName()).log(Level.SEVERE, null, ex);
}
}
else
} else {
rl.readLock().unlock();
}
}
}
}
......@@ -15,29 +15,22 @@
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.communication.client.netty;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.PublicKey;
import java.security.Signature;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import navigators.smart.reconfiguration.ViewManager;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
......@@ -46,12 +39,10 @@ import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
/**
*
* @author Paulo Sousa
*/
@ChannelPipelineCoverage("one")
public class NettyTOMMessageDecoder extends FrameDecoder {
......@@ -59,7 +50,6 @@ public class NettyTOMMessageDecoder extends FrameDecoder {
* number of measures used to calculate statistics
*/
//private final int BENCHMARK_PERIOD = 10000;
private boolean isClient;
private HashMap sessionTable;
private SecretKey authKey;
......@@ -69,7 +59,6 @@ public class NettyTOMMessageDecoder extends FrameDecoder {
private ViewManager manager;
private boolean firstTime;
private ReentrantReadWriteLock rl;
//******* EDUARDO BEGIN: comentei algumas variaveis que nao estavam sendo usadas **************//
//private long numReceivedMsgs = 0;
//private long lastMeasurementStart = 0;
......@@ -77,11 +66,10 @@ public class NettyTOMMessageDecoder extends FrameDecoder {
//private Storage st;
//private int count = 0;
//******* EDUARDO END **************//
private Signature signatureEngine;
private boolean useMAC;
public NettyTOMMessageDecoder(boolean isClient, HashMap sessionTable, SecretKey authKey, int macLength, ViewManager manager, ReentrantReadWriteLock rl, int signatureLength, boolean useMAC){
public NettyTOMMessageDecoder(boolean isClient, HashMap sessionTable, SecretKey authKey, int macLength, ViewManager manager, ReentrantReadWriteLock rl, int signatureLength, boolean useMAC) {
this.isClient = isClient;
this.sessionTable = sessionTable;
this.authKey = authKey;
......@@ -89,11 +77,11 @@ public class NettyTOMMessageDecoder extends FrameDecoder {
this.macSize = macLength;
this.manager = manager;
this.firstTime = true;
this. rl = rl;
this.rl = rl;
this.signatureSize = signatureLength;
//this.st = new Storage(BENCHMARK_PERIOD);
this.useMAC = useMAC;
navigators.smart.tom.util.Logger.println("new NettyTOMMessageDecoder!!, isClient="+isClient);
navigators.smart.tom.util.Logger.println("new NettyTOMMessageDecoder!!, isClient=" + isClient);
}
@Override
......@@ -107,6 +95,8 @@ public class NettyTOMMessageDecoder extends FrameDecoder {
int dataLength = buffer.getInt(buffer.readerIndex());
//Logger.println("Receiving message with "+dataLength+" bytes.");
// Wait until the whole data is available.
if (buffer.readableBytes() < dataLength + 4) {
return null;
......@@ -115,118 +105,76 @@ public class NettyTOMMessageDecoder extends FrameDecoder {
// Skip the length field because we know it already.
buffer.skipBytes(4);
int totalLength = dataLength-2;
//read control byte indicating if message serialization includes class header
byte hasClassHeader = buffer.readByte();
int totalLength = dataLength - 1;
//read control byte indicating if message is signed
byte signed = buffer.readByte();
int authLength = 0;
if (signed==1)
if (signed == 1) {
authLength += signatureSize;
if (useMAC)
}
if (useMAC) {
authLength += macSize;
}
byte[] data = new byte[totalLength-authLength];
byte[] data = new byte[totalLength - authLength];
buffer.readBytes(data);
byte[] digest = null;
if (useMAC){
if (useMAC) {
digest = new byte[macSize];
buffer.readBytes(digest);
}
byte[] signature = null;
if (signed==1){
if (signed == 1) {
signature = new byte[signatureSize];
buffer.readBytes(signature);
}
DataInputStream dis = null;
ObjectInputStream ois = null;
TOMMessage sm = null;
try {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
if (hasClassHeader==0){
dis = new DataInputStream(bais);
sm = new TOMMessage();
sm.rExternal(dis);
}
else {
//if class headers were serialized
ois = new ObjectInputStream(bais);
sm = (TOMMessage) ois.readObject();
}
//TOMMessage sm = (TOMMessage) ois.readObject();
dis = new DataInputStream(bais);
sm = new TOMMessage();
sm.rExternal(dis);
sm.serializedMessage = data;
if (signed==1){
if (signed == 1) {
sm.serializedMessageSignature = signature;
sm.signed = true;
}
if (useMAC)
if (useMAC) {
sm.serializedMessageMAC = digest;
}
if (isClient){
if (isClient) {
//verify MAC
if (useMAC) {
if (!verifyMAC(sm.getSender(), data, digest)){
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.WARNING, "MAC error: message discarded");
return null;
}
}
/*
if (signed==1){
if (!verifySignature(sm.getSender(), data, signature)){
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.WARNING, "Signature error: message discarded");
if (!verifyMAC(sm.getSender(), data, digest)) {
Logger.println("MAC error: message discarded");
return null;
}
}
*/
}
else { /* it's a server */
} else { /* it's a server */
//verifies MAC if it's not the first message received from the client
rl.readLock().lock();
if (sessionTable.containsKey(sm.getSender())){
if (sessionTable.containsKey(sm.getSender())) {
rl.readLock().unlock();
if (useMAC){
if (!verifyMAC(sm.getSender(), data, digest)){
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.WARNING, "MAC error: message discarded");
if (useMAC) {
if (!verifyMAC(sm.getSender(), data, digest)) {
Logger.println("MAC error: message discarded");
return null;
}
}
if (signed==1) {
/* this is now done by the TOMLayer
if (!verifySignature(sm.getSender(), data, digest)){
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.WARNING, "Signature error: message from "+sm.getSender()+" discarded");
System.out.println("(signature error) Message: "+TOMUtil.byteArrayToString(data));
System.out.println("(signature error) signature received: "+TOMUtil.byteArrayToString(digest));
return null;
}
*/
}
/*
numReceivedMsgs++;
if (numReceivedMsgs == 1) {
lastMeasurementStart = System.currentTimeMillis();
} else if (numReceivedMsgs==BENCHMARK_PERIOD) {
long elapsedTime = System.currentTimeMillis() - lastMeasurementStart;
double opsPerSec_ = ((double)BENCHMARK_PERIOD)/(elapsedTime/1000.0);
long opsPerSec = Math.round(opsPerSec_);
if (opsPerSec>max)
max = opsPerSec;
br.ufsc.das.tom.util.Logger.println("(Netty decoder) (from: "+sm.getSender()+") Last "+BENCHMARK_PERIOD+" messages were received at a rate of " + opsPerSec + " msgs per second");
br.ufsc.das.tom.util.Logger.println("(Netty decoder) (from: "+sm.getSender()+") Maximum throughput until now: " + max + " msgs per second");
numReceivedMsgs = 0;
}
*/
}
else {
} else {
//creates MAC/publick key stuff if it's the first message received from the client
navigators.smart.tom.util.Logger.println("Creating MAC/public key stuff, first message from client"+sm.getSender());
navigators.smart.tom.util.Logger.println("sessionTable size="+sessionTable.size());
navigators.smart.tom.util.Logger.println("Creating MAC/public key stuff, first message from client" + sm.getSender());
navigators.smart.tom.util.Logger.println("sessionTable size=" + sessionTable.size());
rl.readLock().unlock();
......@@ -235,128 +183,37 @@ public class NettyTOMMessageDecoder extends FrameDecoder {
macSend.init(authKey);
Mac macReceive = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
macReceive.init(authKey);
NettyClientServerSession cs = new NettyClientServerSession(channel,macSend,macReceive,sm.getSender(),manager.getStaticConf().getRSAPublicKey(sm.getSender()), new ReentrantLock());
NettyClientServerSession cs = new NettyClientServerSession(channel, macSend, macReceive, sm.getSender(), manager.getStaticConf().getRSAPublicKey(sm.getSender()), new ReentrantLock());
//******* EDUARDO END **************//
rl.writeLock().lock();
sessionTable.put(sm.getSender(), cs);
System.out.println("#active clients "+sessionTable.size());
navigators.smart.tom.util.Logger.println("#active clients " + sessionTable.size());
rl.writeLock().unlock();
if (useMAC){
if (!verifyMAC(sm.getSender(), data, digest)){
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.WARNING, "MAC error: message discarded");
return null;
}
}
if (signed==1) {
/* this is now done by the TOMLayer
if (!verifySignature(sm.getSender(), data, digest)){
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.WARNING, "Signature error: message discarded");
return null;
}
*/
if (useMAC && !verifyMAC(sm.getSender(), data, digest)) {
Logger.println("MAC error: message discarded");
return null;
}
}
}
/*
if (st.getCount()==1000){
System.out.println("MAC: Average time for "+benchmarkPeriod+" executions (-10%) = "+this.st.getAverage(true)/1000+ " us ");
System.out.println("MAC: Standard desviation for "+benchmarkPeriod+" executions (-10%) = "+this.st.getDP(true)/1000 + " us ");
System.out.println("MAC: Average time for "+benchmarkPeriod+" executions (all samples) = "+this.st.getAverage(false)/1000+ " us ");
System.out.println("MAC: Standard desviation for "+benchmarkPeriod+" executions (all samples) = "+this.st.getDP(false)/1000 + " us ");
System.out.println("MAC: Maximum time for "+benchmarkPeriod+" executions (-10%) = "+this.st.getMax(true)/1000+ " us ");
System.out.println("MAC: Maximum time for "+benchmarkPeriod+" executions (all samples) = "+this.st.getMax(false)/1000+ " us ");
st = new Storage(benchmarkPeriod);
}
*/
//if I'm a replica then obtain and lock the clientLock in order to guarantee at most one thread per client
/*
if (!isClient) {
rl.readLock().lock();
Lock clientLock = ((NettyClientServerSession)sessionTable.get(sm.getSender())).getLock();
rl.readLock().unlock();
clientLock.lock();
}
*/
return sm;
}
catch (InvalidKeyException ex) {
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.SEVERE, null, ex);
} catch (NoSuchAlgorithmException ex) {
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.SEVERE, null, ex);
} catch (ClassNotFoundException ex) {
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(NettyTOMMessageDecoder.class.getName()).log(Level.SEVERE, null, ex);
return sm;
} catch (Exception ex) {
navigators.smart.tom.util.Logger.println("Impossible to decode message: "+
ex.getMessage());
ex.printStackTrace();
}
return null;
}
boolean verifyMAC(int id, byte[] data, byte[] digest){
boolean verifyMAC(int id, byte[] data, byte[] digest) {
//long startInstant = System.nanoTime();
rl.readLock().lock();
Mac macReceive = ((NettyClientServerSession)sessionTable.get(id)).getMacReceive();
Mac macReceive = ((NettyClientServerSession) sessionTable.get(id)).getMacReceive();
rl.readLock().unlock();
boolean result = Arrays.equals(macReceive.doFinal(data), digest);
//long duration = System.nanoTime() - startInstant;
//st.store(duration);
return result;
}
boolean verifySignature(int id, byte[] data, byte[] digest){
//long startInstant = System.nanoTime();
rl.readLock().lock();
PublicKey pk = ((NettyClientServerSession)sessionTable.get(id)).getPublicKey();
rl.readLock().unlock();
//long duration = System.nanoTime() - startInstant;
//st.store(duration);
return verifySignatureAux(pk, data, digest);
}
/**
* Verify the signature of a message.
*
* @param key the public key to be used to verify the signature
* @param message the signed message
* @param signature the signature to be verified
* @return the signature
*/
public boolean verifySignatureAux(PublicKey key, byte[] message, byte[] signature) {
long startTime = System.nanoTime();
try {
if (signatureEngine == null) {
signatureEngine = Signature.getInstance("SHA1withRSA");
}
signatureEngine.initVerify(key);
signatureEngine.update(message);
boolean result = signatureEngine.verify(signature);
/*
st.store(System.nanoTime()-startTime);
//statistics about signature execution time
count++;
if (count%BENCHMARK_PERIOD==0){
System.out.println("-- (NettyDecoder) Signature verification benchmark:--");
System.out.println("Average time for " + BENCHMARK_PERIOD + " signature verifications (-10%) = " + st.getAverage(true) / 1000 + " us ");
System.out.println("Standard desviation for " + BENCHMARK_PERIOD + " signature verifications (-10%) = " + st.getDP(true) / 1000 + " us ");
System.out.println("Average time for " + BENCHMARK_PERIOD + " signature verifications (all samples) = " + st.getAverage(false) / 1000 + " us ");
System.out.println("Standard desviation for " + BENCHMARK_PERIOD + " signature verifications (all samples) = " + st.getDP(false) / 1000 + " us ");
System.out.println("Maximum time for " + BENCHMARK_PERIOD + " signature verifications (-10%) = " + st.getMax(true) / 1000 + " us ");
System.out.println("Maximum time for " + BENCHMARK_PERIOD + " signature verifications (all samples) = " + st.getMax(false) / 1000 + " us ");
count = 0;
st.reset();
}
*/
return result;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
......@@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.crypto.Mac;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
......@@ -71,12 +72,15 @@ public class NettyTOMMessageEncoder extends SimpleChannelHandler {
if (useMAC)
macData = produceMAC(sm.destination,msgData);
ChannelBuffer buf = buffer(4+1+1+msgData.length+(macData==null?0:macData.length)+(signatureData==null?0:signatureData.length));
int dataLength = 1+msgData.length+(macData==null?0:macData.length)+
(signatureData==null?0:signatureData.length);
//Logger.println("Sending message with "+dataLength+" bytes.");
ChannelBuffer buf = buffer(4+dataLength);
/* msg size */
buf.writeInt(1+1+msgData.length+(macData==null?0:macData.length)+(signatureData==null?0:signatureData.length));
/* control byte indicating if the serialized message includes the class header */
buf.writeByte(sm.includesClassHeader==true?(byte)1:(byte)0);
buf.writeInt(dataLength);
/* control byte indicating if the message is signed or not */
buf.writeByte(sm.signed==true?(byte)1:(byte)0);
/* data to be sent */
......
......@@ -17,7 +17,6 @@
*/
package navigators.smart.communication.server;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
......@@ -25,16 +24,12 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.spec.InvalidKeySpecException;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
......@@ -44,7 +39,8 @@ import javax.crypto.spec.PBEKeySpec;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.reconfiguration.TTPMessage;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.communication.SystemMessage;
import navigators.smart.tom.util.Logger;
/**
* This class represents a connection with other server.
......@@ -87,7 +83,6 @@ public class ServerConnection {
this.inQueue = inQueue;
this.outQueue = new LinkedBlockingQueue<byte[]>(this.manager.getStaticConf().getOutQueueSize());
//******* EDUARDO BEGIN **************//
......@@ -105,9 +100,9 @@ public class ServerConnection {
new DataOutputStream(this.socket.getOutputStream()).writeInt(this.manager.getStaticConf().getProcessId());
} catch (UnknownHostException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
ex.printStackTrace();
} catch (IOException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
ex.printStackTrace();
}
}
//else I have to wait a connection from the remote server
......@@ -115,13 +110,11 @@ public class ServerConnection {
if (this.socket != null) {
try {
socketOutStream = new DataOutputStream(new BufferedOutputStream(this.socket.getOutputStream()));
socketOutStream = new DataOutputStream(this.socket.getOutputStream());
socketInStream = new DataInputStream(this.socket.getInputStream());
} catch (IOException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
Logger.println("Error creating connection to "+remoteId);
ex.printStackTrace();
}
}
......@@ -129,7 +122,6 @@ public class ServerConnection {
this.useSenderThread = this.manager.getStaticConf().isUseSenderThread();
if (useSenderThread && (this.manager.getStaticConf().getTTPId() != remoteId)) {
//Logger.getLogger(ServerConnection.class.getName()).log(Level.INFO, "Using sender thread.");
new SenderThread().start();
} else {
sendLock = new ReentrantLock();
......@@ -137,7 +129,6 @@ public class ServerConnection {
authenticateAndEstablishAuthKey();
if (!this.manager.getStaticConf().isTheTTP()) {
if (this.manager.getStaticConf().getTTPId() == remoteId) {
//Uma thread "diferente" para as msgs recebidas da TTP
new TTPReceiverThread(replica).start();
......@@ -152,9 +143,8 @@ public class ServerConnection {
* Stop message sending and reception.
*/
public void shutdown() {
//System.out.println("SHUTDOWN para "+remoteId);
Logger.println("SHUTDOWN for "+remoteId);
doWork = false;
closeSocket();
}
......@@ -163,11 +153,10 @@ public class ServerConnection {
* Used to send packets to the remote server.
*/
public final void send(byte[] data) throws InterruptedException {
if (useSenderThread) {
//only enqueue messages if there queue is not full
if (!outQueue.offer(data)) {
navigators.smart.tom.util.Logger.println("(ServerConnection.send) out queue for " + remoteId + " full (message discarded).");
Logger.println("(ServerConnection.send) out queue for " + remoteId + " full (message discarded).");
}
} else {
sendLock.lock();
......@@ -185,18 +174,22 @@ public class ServerConnection {
do {
if (socket != null && socketOutStream != null) {
try {
socketOutStream.writeInt(messageData.length);
socketOutStream.write(messageData);
if (this.manager.getStaticConf().getUseMACs() == 1) {
socketOutStream.write(macSend.doFinal(messageData));
//do an extra copy of the data to be sent, but on a single out stream write
byte[] mac = (this.manager.getStaticConf().getUseMACs() == 1)?macSend.doFinal(messageData):null;
byte[] data = new byte[4+messageData.length+((mac!=null)?mac.length:0)];
int value = messageData.length;
System.arraycopy(new byte[]{(byte)(value >>> 24),(byte)(value >>> 16),(byte)(value >>> 8),(byte)value},0,data,0,4);
System.arraycopy(messageData,0,data,4,messageData.length);
if(mac != null) {
System.arraycopy(mac,0,data,4+messageData.length,mac.length);
}
socketOutStream.flush();
socketOutStream.write(data);
return;
} catch (IOException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
closeSocket();
waitAndConnect();
}
} else {
......@@ -266,23 +259,20 @@ public class ServerConnection {
//******* EDUARDO END **************//
} else {
socket = newSocket;
}
} catch (UnknownHostException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "Error connecting", ex);
ex.printStackTrace();
} catch (IOException ex) {
//Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "Error connecting", ex);
ex.printStackTrace();
}
if (socket != null) {
try {
socketOutStream = new DataOutputStream(socket.getOutputStream());
socketInStream = new DataInputStream(socket.getInputStream());
} catch (IOException ex) {
ex.printStackTrace();
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
}
}
......@@ -316,12 +306,8 @@ public class ServerConnection {
macReceive = Mac.getInstance(MAC_ALGORITHM);
macReceive.init(authKey);
macSize = macSend.getMacLength();
} catch (InvalidKeySpecException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
} catch (InvalidKeyException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
} catch (NoSuchAlgorithmException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
} catch (Exception ex) {
ex.printStackTrace();
}
}
......@@ -331,7 +317,7 @@ public class ServerConnection {
socketOutStream.flush();
socket.close();
} catch (IOException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
Logger.println("Error closing socket to "+remoteId);
}
socket = null;
......@@ -351,19 +337,6 @@ public class ServerConnection {
}
}
private final SystemMessage bytesToMessage(byte[] data) {
try {
ObjectInputStream obIn = new ObjectInputStream(new ByteArrayInputStream(data));
return (SystemMessage) obIn.readObject();
} catch (ClassNotFoundException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
}
return null;
}
/**
* Thread used to send packets to the remote server.
*/
......@@ -389,7 +362,7 @@ public class ServerConnection {
}
}
Logger.getLogger(ServerConnection.class.getName()).log(Level.INFO, "Sender for " + remoteId + " stoped!");
Logger.println("Sender for " + remoteId + " stopped!");
}
}
......@@ -408,7 +381,7 @@ public class ServerConnection {
try {
receivedMac = new byte[Mac.getInstance(MAC_ALGORITHM).getMacLength()];
} catch (NoSuchAlgorithmException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
ex.printStackTrace();
}
while (doWork) {
......@@ -440,31 +413,21 @@ public class ServerConnection {
SystemMessage sm = (SystemMessage) (new ObjectInputStream(new ByteArrayInputStream(data)).readObject());
if (sm.getSender() == remoteId) {
//System.out.println("Mensagem recebia de: "+remoteId);
if (!inQueue.offer(sm)) {
navigators.smart.tom.util.Logger.println("(ReceiverThread.run) in queue full (message from " + remoteId + " discarded).");
Logger.println("(ReceiverThread.run) in queue full (message from " + remoteId + " discarded).");
System.out.println("(ReceiverThread.run) in queue full (message from " + remoteId + " discarded).");
}
}
} else {
//TODO: violation of authentication... we should do something
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "WARNING: Violation of authentication in message received from " + remoteId);
Logger.println("WARNING: Violation of authentication in message received from " + remoteId);
}
/*
} else {
//TODO: invalid MAC... we should do something
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "WARNING: Invalid MAC");
}
*/
} catch (ClassNotFoundException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "Should never happen,", ex);
//invalid message sent, just ignore;
} catch (IOException ex) {
if (doWork) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "Closing socket and reconnecting", ex);
Logger.println("Closing socket and reconnecting");
closeSocket();
waitAndConnect();
}
}
......@@ -472,14 +435,13 @@ public class ServerConnection {
waitAndConnect();
}
}
Logger.getLogger(ServerConnection.class.getName()).log(Level.INFO, "Receiver for " + remoteId + " stoped!");
}
}
//******* EDUARDO BEGIN: thread especial para receber mensagens indicando a entrada no sistema, vindas da da TTP **************//
//Simplesmente entrega a mensagens para a replica, indicando a sua entrada no sistema
//TODO: Ask eduardo why a new thread is needed!!!
//TODO2: Remove all duplicated code
/**
* Thread used to receive packets from the remote server.
......@@ -499,7 +461,6 @@ public class ServerConnection {
try {
receivedMac = new byte[Mac.getInstance(MAC_ALGORITHM).getMacLength()];
} catch (NoSuchAlgorithmException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, null, ex);
}
while (doWork) {
......@@ -540,23 +501,13 @@ public class ServerConnection {
}
} else {
//TODO: violation of authentication... we should do something
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "WARNING: Violation of authentication in message received from " + remoteId);
Logger.println("WARNING: Violation of authentication in message received from " + remoteId);
}
/*
} else {
//TODO: invalid MAC... we should do something
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "WARNING: Invalid MAC");
}
*/
} catch (ClassNotFoundException ex) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "Should never happen,", ex);
ex.printStackTrace();
} catch (IOException ex) {
if (doWork) {
Logger.getLogger(ServerConnection.class.getName()).log(Level.SEVERE, "Closing socket and reconnecting", ex);
closeSocket();
waitAndConnect();
}
}
......@@ -564,8 +515,6 @@ public class ServerConnection {
waitAndConnect();
}
}
Logger.getLogger(ServerConnection.class.getName()).log(Level.INFO, "Receiver for " + remoteId + " stoped!");
}
}
//******* EDUARDO END **************//
......
......@@ -30,7 +30,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
......@@ -38,7 +37,7 @@ import java.util.logging.Logger;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.communication.SystemMessage;
/**
*
......@@ -150,7 +149,6 @@ public class ServersCommunicationLayer extends Thread {
byte[] data = bOut.toByteArray();
for (int i : targets) {
//br.ufsc.das.tom.util.Logger.println("(ServersCommunicationLayer.send) Sending msg to replica "+i);
try {
if (i == me) {
inQueue.put(sm);
......@@ -165,7 +163,6 @@ public class ServersCommunicationLayer extends Thread {
ex.printStackTrace();
}
}
//br.ufsc.das.tom.util.Logger.println("(ServersCommunicationLayer.send) Finished sending messages to replicas");
}
public void shutdown() {
......@@ -217,8 +214,8 @@ public class ServersCommunicationLayer extends Thread {
//******* EDUARDO BEGIN **************//
if (!this.manager.isInInitView() &&
!this.manager.isInCurrentView() &&
(this.manager.getStaticConf().getTTPId() != remoteId)) {
!this.manager.isInCurrentView() &&
(this.manager.getStaticConf().getTTPId() != remoteId)) {
waitViewLock.lock();
pendingConn.add(new PendingConnection(newSocket, remoteId));
waitViewLock.unlock();
......@@ -227,8 +224,6 @@ public class ServersCommunicationLayer extends Thread {
}
//******* EDUARDO END **************//
} catch (SocketTimeoutException ex) {
//timeout on the accept... do nothing
} catch (IOException ex) {
......
......@@ -22,7 +22,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.communication.SystemMessage;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.Storage;
......
......@@ -18,7 +18,6 @@
package navigators.smart.paxosatwar;
import navigators.smart.paxosatwar.executionmanager.Round;
import navigators.smart.paxosatwar.roles.Proposer;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.Logger;
......@@ -37,11 +36,11 @@ public class Consensus {
private Round decisionRound = null;
private byte[] decision = null; // decided value
private TOMMessage[] deserializedDecision = null; // decided value (deserialized)
//private final Object sync = new Object();
// TODO: Faz sentido ser public?
public long startTime; // the consensus start time
public long executionTime; // consensus execution time
public int batchSize = 0; //number of messages included in the batch
//for benchmarking
public TOMMessage firstMessageProposed = null;
public int batchSize = 0;
/**
* Creates a new instance of Consensus
......@@ -49,9 +48,8 @@ public class Consensus {
* @param eid The execution ID for this consensus
* @param startTime The consensus start time
*/
public Consensus(int eid, long startTime) {
public Consensus(int eid) {
this.eid = eid;
this.startTime = startTime;
}
public void decided(Round round) {
......@@ -109,6 +107,7 @@ public class Consensus {
public int getId() {
return eid;
}
private void waitForPropose() {
while(decisionRound.deserializedPropValue == null) {
try{
......
......@@ -200,12 +200,6 @@ public final class ExecutionManager {
stoppedMsgsLock.lock();
this.stopped = false;
// We don't want to use timeouts inside paxos anymore
/*if (stoppedRound != null) {
acceptor.scheduleTimeout(stoppedRound);
stoppedRound = null;
}*/
//process stopped messages
while(!stoppedMsgs.isEmpty()){
acceptor.processMessage(stoppedMsgs.remove());
......@@ -262,22 +256,12 @@ public final class ExecutionManager {
boolean canProcessTheMessage = false;
if (
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
// Isto serve para re-direccionar as mensagens para o out of context
// enquanto a replica esta a receber o estado das outras e a actualizar-se
isRetrievingState || // Is this replica retrieving a state?
// Is this not a revived replica?
(!(/*currentConsId == -1 &&*/ lastConsId == -1 && consId >= (lastConsId + revivalHighMark)) &&
/******************************************************************/
(consId > lastConsId && (consId < (lastConsId + paxosHighMark))))
) { // Is this message within the low and high marks (or maybe is the replica synchronizing) ?
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
// Isto serve para re-direccionar as mensagens para o out of context
// enquanto a replica esta a receber o estado das outras e a actualizar-se
if (isRetrievingState || // Is this replica retrieving a state?
(!(lastConsId == -1 && consId >= (lastConsId + revivalHighMark)) && //not a recovered replica
(consId > lastConsId && (consId < (lastConsId + paxosHighMark))))) { // not an ahead of time message
if(stopped) {//just an optimization to avoid calling the lock in normal case
stoppedMsgsLock.lock();
......@@ -288,51 +272,23 @@ public final class ExecutionManager {
stoppedMsgs.add(msg);
}
stoppedMsgsLock.unlock();
return false;
}
if (
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
// Isto serve para re-direccionar as mensagens para o out of context
// enquanto a replica esta a receber o estado das outras e a actualizar-se
isRetrievingState ||
/******************************************************************/
consId > (lastConsId + 1)
) {
Logger.println("(ExecutionManager.checkLimits) Message for execution "+consId+" is out of context, adding it to out of context set");
//store it as an ahead of time message (out of context)
addOutOfContextMessage(msg);
} else {
Logger.println("(ExecutionManager.checkLimits) message for execution "+consId+" can be processed");
canProcessTheMessage = true;
if(isRetrievingState || consId > (lastConsId + 1) ||
(tomLayer.getInExec() != consId && msgType != MessageFactory.PROPOSE)) { //not propose message for the next consensus
System.out.println("(ExecutionManager.checkLimits) Message for execution "+consId+" is out of context, adding it to out of context set");
addOutOfContextMessage(msg);
} else { //can process!
Logger.println("(ExecutionManager.checkLimits) message for execution "+consId+" can be processed");
canProcessTheMessage = true;
}
}
} else if (
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
// Is this replica revived?
(/*currentConsId == -1 &&*/ lastConsId == -1 && consId >= (lastConsId + revivalHighMark)) ||
/******************************************************************/
(consId >= (lastConsId + paxosHighMark))
) { // Does this message exceeds the high mark?
/**
System.out.println("##################################################################################");
System.out.println("- Ahead-of-time message (" + msg + ") discarded");
System.out.println("- If many messages of the same consensus are discarded, the replica can halt!");
System.out.println("- Try to increase the 'system.paxos.highMarc' configuration parameter.");
System.out.println("- Last consensus executed: " + lastConsId);
System.out.println("##################################################################################");
/*/
//TODO: at this point a new state should be recovered from other correct replicas
} else if ((lastConsId == -1 && consId >= (lastConsId + revivalHighMark)) || //recovered...
(consId >= (lastConsId + paxosHighMark))) { //or too late replica
//Start state transfer
/** ISTO E CODIGO DO JOAO, PARA TRATAR DA TRANSFERENCIA DE ESTADO */
Logger.println("(ExecutionManager.checkLimits) Message for execution "+consId+" is beyond the paxos highmark, adding it to out of context set");
Logger.println("(ExecutionManager.checkLimits) Message for execution "+
consId+" is beyond the paxos highmark, adding it to out of context set");
addOutOfContextMessage(msg);
tomLayer.requestState(this.reconfManager.getStaticConf().getProcessId(),
this.reconfManager.getCurrentViewOtherAcceptors(), msg.getSender(), consId);
......@@ -340,7 +296,6 @@ public final class ExecutionManager {
}
outOfContextLock.unlock();
//br.ufsc.das.util.Logger.println("(checkLimits) Mensagem recebida nao estah dentro dos limites");
return canProcessTheMessage;
}
......@@ -349,12 +304,10 @@ public final class ExecutionManager {
* @param eid The ID for the consensus execution in question
* @return True if there are still messages to be processed, false otherwise
*/
public boolean thereArePendentMessages(int eid) {
public boolean thereArePendingMessages(int eid) {
outOfContextLock.lock();
/******* BEGIN OUTOFCONTEXT CRITICAL SECTION *******/
boolean result = outOfContextProposes.get(eid) != null || outOfContext.get(eid) != null;
/******* END OUTOFCONTEXT CRITICAL SECTION *******/
outOfContextLock.unlock();
......@@ -423,8 +376,10 @@ public final class ExecutionManager {
//there is no execution with the given eid
//let's create one...
execution = new Execution(this, new Consensus(eid, System.currentTimeMillis()),
initialTimeout);
Consensus cons = new Consensus(eid);
execution = new Execution(this, cons, initialTimeout);
//...and add it to the executions table
executions.put(eid, execution);
......
/**
* Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
*
* This file is part of SMaRt.
*
* SMaRt is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* SMaRt is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.paxosatwar.executionmanager;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.Signature;
import java.security.SignedObject;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import navigators.smart.paxosatwar.messages.CollectProof;
import navigators.smart.paxosatwar.messages.FreezeProof;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.core.timer.messages.RTCollect;
import navigators.smart.tom.leaderchange.CollectData;
/**
* This class is used to process data relacioned with freezed rounds.
* Generate proposes - good values, verify the proposed values and so on...
*/
public class ProofVerifier {
//******* EDUARDO BEGIN: tudo isso deve ser acessado a partir do ReconvigurationManager **************//
//private int numberOfNonces; // Ammount of nonces that have to be delivered to the application
//private PublicKey[] publickeys; // public keys of the replicas
//******* EDUARDO END **************//
private PrivateKey prk = null; // private key for this replica
private Signature engine; // Signature engine
private ReconfigurationManager manager;
/**
* Creates a new instance of ProofVerifier
* @param conf TOM configuration
*/
public ProofVerifier(ReconfigurationManager manager) {
//******* EDUARDO BEGIN **************//
//this.quorumF = conf.getF();
//this.quorumStrong = (int) ((conf.getN() + quorumF) / 2);
//this.numberOfNonces = manager.getStaticConf().getNumberOfNonces();
this.manager = manager;
this.prk = manager.getStaticConf().getRSAPrivateKey();
//******* EDUARDO END **************//
try {
this.engine = Signature.getInstance("SHA1withRSA");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Signs a collect data object
* @param cp a collect data object
* @return Signed collect data object
*/
public SignedObject sign(CollectData collects) {
try {
return new SignedObject(collects, prk, engine);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* Signs proofs of a freezed consensus
* @param cp Proofs of a freezed consensus
* @return Signed proofs
*/
public SignedObject sign(CollectProof cp) {
try {
return new SignedObject(cp, prk, engine);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* TODO: Nao sei para que serve nem o q e um RTCollect. Mas deve ser da difusao atomica
* @param cp
* @return Signed
*/
public SignedObject sign(RTCollect trc) {
try {
return new SignedObject(trc, prk, engine);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* Counts how many proofs are in the given array (how many are diferent from null)
* @param proofs Array of proofs, which might have indexes pointing to null
* @return Number of proofs in the array
*/
public int countProofs(SignedObject[] proofs) {
int validProofs = 0;
for (int i = 0; i < proofs.length; i++) {
if (proofs[i] != null) {
validProofs++;
}
}
return validProofs;
}
/**
* Obtains the value that is considered to be good, as is specified by the PaW algorithm
* @param proofs Signed proofs to be evaluated
* @param in True if the proofs to be evaluated are from the freezed consensus, false for the proofs from the next consensus
* @return The value considered to be good, if any. If such value can't be found, null is returned
*/
public byte[] getGoodValue(SignedObject[] proofs, boolean in) {
try {
CollectProof[] cps = new CollectProof[proofs.length];
for (int i = 0; i < proofs.length; i++) {
if (proofs[i] != null) {
cps[i] = (CollectProof) proofs[i].getObject();
}
}
return getGoodValue(cps, in);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* Checks if an specified array of bytes is contained in a given linked list (whose values are arrays of bytes)
* @param l Linked list containing arrays of bytes
* @param e Array of bytes that is to be search in the linked list
* @return True if 'e' is contained in 'l', false otherwise
*/
private boolean containsArray(LinkedList<byte[]> l, byte[] e) {
for (Iterator<byte[]> i = l.iterator(); i.hasNext();) {
byte[] value = i.next();
if (Arrays.equals(value, e)) {
return true;
}
}
return false;
}
/**
* Obtains the value that is considered to be good, as is specified by the PaW algorithm
* @param proofs Proofs to be evaluated
* @param in True if the proofs to be evaluated are from the freezed consensus, false for the proofs from the next consensus
* @return The value considered to be good, if any. If such value can't be found, null is returned
*/
public byte[] getGoodValue(CollectProof[] proofs, boolean in) {
LinkedList<byte[]> poss = buildPoss(proofs, in);
LinkedList<byte[]> acc = buildAcc(proofs, in);
for (Iterator<byte[]> i = acc.iterator(); i.hasNext();) {
byte[] value = i.next();
if (containsArray(poss, value)) {
return value;
}
}
return null;
}
/**
* Called by acceptors to verify if some proposal is good, as specified by the PaW algorithm
* @param value The proposed value
* @param proofs The proofs to check the value agaisnt
* @param in True if the proofs to be evaluated are from the freezed consensus, false for the proofs from the next consensus
* @return True if the value is good, false otherwise
*/
public boolean good(byte[] value, CollectProof[] proofs, boolean in) {
LinkedList<byte[]> poss = buildPoss(proofs, in);
LinkedList<byte[]> acc = buildAcc(proofs, in);
//condition G2
if (containsArray(acc, value) && (containsArray(poss, value) || poss.isEmpty())) {
return true;
}
//condition G1
if (poss.isEmpty()) {
//alysson: ainda nao estou bem certo q isso esta certo
return true;
}
return false;
}
/**
* Returns the round number of the next consensus's execution from an array of proofs
* @param proof Array of proofs which gives out the round number of next consensus's execution
* @return The number of the round, or -1 if there is not one executing
*/
public int getNextExecRound(CollectProof[] proof) {
for (int i = 0; i < proof.length; i++) {
if (proof[i].getProofs(false) != null) {
int r = proof[i].getProofs(false).getRound();
int c = 1;
for (int j = i + 1; j < proof.length; j++) {
if (proof[j].getProofs(false) != null) {
if (r == proof[j].getProofs(false).getRound()) {
c++;
}
}
}
//******* EDUARDO BEGIN: nova forma de acessar o f **************//
if (c > this.manager.getQuorumF()) {
//******* EDUARDO END **************//
return r;
}
}
}
return -1;
}
/**
* Checks if this is a valid proof
* @param eid Execution ID to match against the proof
* @param round round number to match against the proof
* @param proof Proof to be verified
* @return True if valid, false otherwise
*/
public boolean validProof(int eid, int round, FreezeProof proof) {
// TODO: nao devia ser 'proof.getRound() <= round'?
return (proof != null) && (proof.getEid() == eid) && (proof.getRound() == round);
}
/**
* Returns the valid proofs
* @param eid Execution ID to match against the proofs
* @param round round number to match against the proofs
* @param proof Proofs to be verified
* @return Array the the valid proofs
*/
public CollectProof[] checkValid(int eid, int round, SignedObject[] proof) {
if (proof == null) {
return null;
}
Collection<CollectProof> valid = new HashSet<CollectProof>();
try {
for (int i = 0; i < proof.length; i++) {
if (proof[i] != null && validSignature(proof[i], i)) {
CollectProof cp = (CollectProof) proof[i].getObject();
if (validProof(eid, round, cp.getProofs(true))) {
valid.add(cp);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return valid.toArray(new CollectProof[0]);
}
/**
* Checks if a signature is valid
* @param so Signed object
* @param sender Replica that sent the signed object
* @return True if the signature is valid, false otherwise
*/
public boolean validSignature(SignedObject so, int sender) {
try {
//return so.verify(this.publickeys[sender], engine);
//******* EDUARDO BEGIN **************//
return so.verify(this.manager.getStaticConf().getRSAPublicKey(sender), engine);
//******* EDUARDO END **************//
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* Checks if a replica is the leader, given an array of proofs.
* @param l Replica to be checked
* @param proof Proofs to verify the leader against
* @return True if 'l' is the leader, false otherwise
*/
public boolean isTheLeader(int l, CollectProof[] proof) {
int c = 0;
// A replica is considered to really be the leader, if more than F
// proofs have 'getLeader()' set to be 'l'
for (int i = 0; i < proof.length; i++) {
if (proof[i] != null && proof[i].getLeader() == l) {
c++;
}
}
//******* EDUARDO BEGIN **************//
return c > this.manager.getCurrentViewF();
//******* EDUARDO END **************//
}
/**
* Builds a Poss set as defined in the PaW algorithm
* @param proofs Proofs to be used to create the set
* @param in True if the proofs to be used are from the freezed consensus, false for the proofs from the next consensus
* @return A linked list which stands for the Poss set
*/
private LinkedList<byte[]> buildPoss(CollectProof[] proofs, boolean in) {
LinkedList<byte[]> poss = new LinkedList<byte[]>();
for (int i = 0; i < proofs.length; i++) {
byte[] w = null;
if (proofs[i] != null && proofs[i].getProofs(in) != null) {
w = proofs[i].getProofs(in).getWeak();
}
if (w != null) {
int countW = 0;
int countS = 0;
for (int j = 0; j < proofs.length; j++) {
if (proofs[j] != null && proofs[j].getProofs(in) != null) {
if (Arrays.equals(w, proofs[j].getProofs(in).getWeak())) {
countW++;
}
if (Arrays.equals(w, proofs[j].getProofs(in).getStrong())) {
countS++;
}
}
}
//******* EDUARDO BEGIN **************//
if ((countW > manager.getQuorumStrong() || countS > manager.getQuorumF())
//******* EDUARDO END **************//
&& !poss.contains(w)) {
poss.add(w);
}
}
}
return poss;
}
/**
* Builds a Acc set as defined in the PaW algorithm
* @param proofs Proofs to be used to create the set
* @param in True if the proofs to be used are from the freezed consensus, false for the proofs from the next consensus
* @return A linked list which stands for the Acc set
*/
private LinkedList<byte[]> buildAcc(CollectProof[] proofs, boolean in) {
LinkedList<byte[]> acc = new LinkedList<byte[]>();
for (int i = 0; i < proofs.length; i++) {
byte[] w = null;
if (proofs[i] != null && proofs[i].getProofs(in) != null) {
w = proofs[i].getProofs(in).getWeak();
}
if (w != null) {
int count = 0;
for (int j = 0; j < proofs.length; j++) {
if (proofs[j] != null && proofs[j].getProofs(in) != null &&
Arrays.equals(w, proofs[j].getProofs(in).getWeak())) {
count++;
}
}
//******* EDUARDO BEGIN **************//
if (count > manager.getQuorumF() && !acc.contains(w)) {
//******* EDUARDO END **************//
acc.add(w);
}
}
}
return acc;
}
}
/**
* Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
*
* This file is part of SMaRt.
*
* SMaRt is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* SMaRt is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.paxosatwar.executionmanager;
import java.util.TimerTask;
import navigators.smart.paxosatwar.roles.Acceptor;
/**
* This class implements a timeout for consensus's rounds
*/
public class TimeoutTask extends TimerTask {
private Acceptor acceptor; // The acceptor role of the PaW algorithm
private Round round; // The round to which this timeout is related to
/**
* Creates a new instance of TimeoutTask
* @param acceptor The acceptor role of the PaW algorithm
* @param round The round to which this timeout is related to
*/
public TimeoutTask(Acceptor acceptor, Round round) {
this.acceptor = acceptor;
this.round = round;
}
/**
* Runs this timer
*/
public void run() {
acceptor.timeout(round);
}
}
/**
* Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
*
* This file is part of SMaRt.
*
* SMaRt is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* SMaRt is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.paxosatwar.messages;
import java.io.Serializable;
/**
* Proofs to freezed consensus. This class can contain proofs for two consensus.
* The freezed one, and the next one (if have).
*/
public final class CollectProof implements Serializable {
// Proofs to freezed consensus
private FreezeProof proofIn;
// Proofs to next consensus, if have next - after the freezed one
private FreezeProof proofNext;
// The new leader id
private int newLeader;
/**
* Creates a new instance of CollectProof
* @param proofIn Proofs to freezed consensus
* @param proofNext Proofs to next consensus, if have next - after the freezed one
* @param newLeader The new leader id
*/
public CollectProof(FreezeProof proofIn, FreezeProof proofNext, int newLeader) {
this.proofIn = proofIn;
this.proofNext = proofNext;
this.newLeader = newLeader;
}
/**
* Retrieves the proof
* @param in True for the proof of the freezed consensus, false for the proof of the next consensus
* @return
*/
public FreezeProof getProofs(boolean in){
if(in){
return this.proofIn;
}else{
return this.proofNext;
}
}
/**
* Retrieves the leader ID
* @return The leader ID
*/
public int getLeader(){
return this.newLeader;
}
}
/**
* Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
*
* This file is part of SMaRt.
*
* SMaRt is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* SMaRt is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.paxosatwar.messages;
import java.io.Serializable;
/**
*
* @author edualchieri
*
* Proofs for one (freezed) consensus.
*/
public final class FreezeProof implements Serializable {
private int pid; // Replica ID
private int eid; // Consensus's execution ID
private int round; // Round number
private byte[] weak; // weakly accepted value
private byte[] strong; // strongly accepted value
private byte[] decide; // decided value
/**
* Creates a new instance of FreezeProof
* @param pid Replica ID
* @param eid Consensus's execution ID
* @param round Round number
* @param weak Weakly accepted value
* @param strong Strongly accepted Value
* @param decide Decided value
* @param aut Request authenticators
*/
public FreezeProof(int pid, int eid, int round,
byte[] weak, byte[] strong, byte[] decide) {
this.pid = pid;
this.eid = eid;
this.round = round;
this.weak = weak;
this.strong = strong;
this.decide = decide;
}
/**
* Retrieves the replica ID
* @return Replica ID
*/
public int getPid() {
return pid;
}
/**
* Retrieves the consensus's execution ID
* @return Consensus's execution ID
*/
public int getEid() {
return eid;
}
/**
* Retrieves the round number
* @return Round number
*/
public int getRound() {
return round;
}
/**
* Retrieves the weakly accepted value
* @return Weakly accepted value
*/
public byte[] getWeak() {
return weak;
}
/**
* Retrieves the strongly accepted value
* @return Strongly accepted value
*/
public byte[] getStrong() {
return strong;
}
/**
* Retrieves the decided value
* @return Decided value
*/
public byte[] getDecide() {
return decide;
}
// Overwriten methods below
@Override
public String toString() {
return "W="+str(weak)+" S="+str(strong)+" D="+str(decide);
}
private final String str(byte[] obj) {
return (obj == null)?"*":new String(obj);
}
}
......@@ -23,7 +23,7 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.communication.SystemMessage;
/**
......@@ -148,7 +148,7 @@ public class PaxosMessage extends SystemMessage {
}
//WEAK, STRONG, DECIDE and FREEZE does not have associated proofs
if(paxosType == MessageFactory.PROPOSE || paxosType == MessageFactory.COLLECT) {
if(paxosType == MessageFactory.PROPOSE) {
proof = in.readObject();
......@@ -211,47 +211,20 @@ public class PaxosMessage extends SystemMessage {
* @return Message type
*/
public String getPaxosVerboseType() {
if (paxosType==MessageFactory.COLLECT)
return "COLLECT";
else if (paxosType==MessageFactory.DECIDE)
return "DECIDE";
else if (paxosType==MessageFactory.FREEZE)
return "FREEZE";
else if (paxosType==MessageFactory.PROPOSE)
if (paxosType==MessageFactory.PROPOSE)
return "PROPOSE";
else if (paxosType==MessageFactory.STRONG)
return "STRONG";
else if (paxosType==MessageFactory.WEAK)
return "WEAK";
else
return "";
}
// Over-written method
@Override
public String toString() {
return "type="+getPaxosVerboseType()+", number="+getNumber()+", round="+getRound()+", from="+getSender()+", "+
((getProof() instanceof CollectProof[])?
Arrays.asList((CollectProof[])getProof()):getProof());
return "type="+getPaxosVerboseType()+", number="+getNumber()+", round="+
getRound()+", from="+getSender();
}
}
......
......@@ -18,19 +18,10 @@
package navigators.smart.paxosatwar.roles;
import java.security.SignedObject;
import navigators.smart.communication.ServerCommunicationSystem;
import navigators.smart.paxosatwar.executionmanager.Execution;
import navigators.smart.paxosatwar.executionmanager.ExecutionManager;
import navigators.smart.paxosatwar.executionmanager.ProofVerifier;
import navigators.smart.paxosatwar.executionmanager.Round;
import navigators.smart.paxosatwar.messages.CollectProof;
import navigators.smart.paxosatwar.messages.MessageFactory;
import navigators.smart.paxosatwar.messages.PaxosMessage;
import navigators.smart.paxosatwar.messages.Proof;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.util.Logger;
......@@ -41,7 +32,6 @@ public class Proposer {
private ExecutionManager manager = null; // Execution manager of consensus's executions
private MessageFactory factory; // Factory for PaW messages
private ProofVerifier verifier; // Verifier for proofs
private ServerCommunicationSystem communication; // Replicas comunication system
//private TOMConfiguration conf; // TOM configuration
......@@ -55,10 +45,9 @@ public class Proposer {
* @param conf TOM configuration
*/
public Proposer(ServerCommunicationSystem communication, MessageFactory factory,
ProofVerifier verifier, ReconfigurationManager manager) {
ReconfigurationManager manager) {
this.communication = communication;
this.communication.setProposer(this);
this.verifier = verifier;
this.factory = factory;
this.reconfManager = manager;
}
......@@ -79,114 +68,9 @@ public class Proposer {
* @param value Value to be proposed
*/
public void startExecution(int eid, byte[] value) {
/*System.out.println("tam "+this.reconfManager.getCurrentViewAcceptors().length);
for(int i = 0; i < this.reconfManager.getCurrentViewAcceptors().length;i++){
System.out.println("i "+this.reconfManager.getCurrentViewAcceptors()[i]);
}*/
//******* EDUARDO BEGIN **************//
communication.send(this.reconfManager.getCurrentViewAcceptors(),
factory.createPropose(eid, 0, value, null));
//******* EDUARDO END **************//
}
/**
* This method only deals with COLLECT messages.
*
* @param msg the COLLECT message received
*/
public void deliver(PaxosMessage msg) {
if (manager.checkLimits(msg)) {
collectReceived(msg);
}
}
/**
* This method is executed when a COLLECT message is received.
*
* @param msg the collect message
*/
private void collectReceived(PaxosMessage msg) {
Logger.println("(Proposer.collectReceived) COLLECT for "+
msg.getNumber()+","+msg.getRound()+" received.");
Execution execution = manager.getExecution(msg.getNumber());
execution.lock.lock();
SignedObject proof = (SignedObject) msg.getProof();
if (proof != null && verifier.validSignature(proof, msg.getSender())) {
CollectProof cp = null;
try {
cp = (CollectProof) proof.getObject();
} catch (Exception e) {
e.printStackTrace(System.out);
}
Logger.println("(Proposer.collectReceived) signed COLLECT for "+
msg.getNumber()+","+msg.getRound()+" received.");
if ((cp != null) && (cp.getProofs(true) != null) &&
// the received proof (that the round was frozen) should be valid
verifier.validProof(execution.getId(), msg.getRound(), cp.getProofs(true)) &&
// this replica is the current leader
(cp.getLeader() == reconfManager.getStaticConf().getProcessId())) {
int nextRoundNumber = msg.getRound() + 1;
Logger.println("(Proposer.collectReceived) valid COLLECT for starting "+
execution.getId()+","+nextRoundNumber+" received.");
Round round = execution.getRound(nextRoundNumber, this.reconfManager);
round.setCollectProof(msg.getSender(),proof);
//******* EDUARDO BEGIN **************//
if (verifier.countProofs(round.proofs) > reconfManager.getQuorumStrong()) {
//******* EDUARDO END **************//
Logger.println("(Proposer.collectReceived) proposing for "+
execution.getId()+","+nextRoundNumber);
byte[] inProp = verifier.getGoodValue(round.proofs, true);
byte[] nextProp = verifier.getGoodValue(round.proofs, false);
manager.getTOMLayer().imAmTheLeader();
//******* EDUARDO BEGIN **************//
communication.send(this.reconfManager.getCurrentViewAcceptors(),
factory.createPropose(execution.getId(), nextRoundNumber,
inProp, new Proof(round.proofs, nextProp)));
//******* EDUARDO END **************//
}
}
}
execution.lock.unlock();
}
/* Not used in JBP, but can be usefull for systems in which there are processes
that are only proposers
private void paxosMessageReceived(int eid, int rid, int msgType,
int sender, Object value) {
Round round = manager.getExecution(eid).getRound(rid);
if(msgType == WEAK) {
round.setWeak(sender, value);
if(round.countWeak(value) > manager.quorumFastDecide) {
manager.getExecution(eid).decide(value,round.getNumber());
}
} else if(msgType == STRONG) {
round.setStrong(sender, value);
if(round.countStrong(value) > manager.quorum2F) {
manager.getExecution(eid).decide(value,round.getNumber());
}
} else if(msgType == DECIDE) {
round.setDecide(sender, value);
if(round.countDecide(value) > manager.quorumF) {
manager.getExecution(eid).decide(value,round.getNumber());
}
}
}
*/
}
......@@ -52,7 +52,7 @@ public class Reconfiguration {
byte[] signature = TOMUtil.signMessage(proxy.getViewManager().getStaticConf().getRSAPrivateKey(),
request.toString().getBytes());
request.setSignature(signature);
byte[] reply = proxy.invoke(TOMUtil.getBytes(request), ReconfigurationManager.TOM_RECONFIG_REQUEST, false);
byte[] reply = proxy.invoke(TOMUtil.getBytes(request), ReconfigurationManager.TOM_RECONFIG_REQUEST);
request = null;
return (ReconfigureReply)TOMUtil.getObject(reply);
}
......
......@@ -9,7 +9,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.StringTokenizer;
import navigators.smart.paxosatwar.Consensus;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.TOMUtil;
......@@ -24,7 +23,8 @@ public class ReconfigurationManager extends ViewManager {
public static final int REMOVE_SERVER = 1;
public static final int CHANGE_F = 2;
public static final int TOM_NORMAL_REQUEST = 0;
public static final int TOM_RECONFIG_REQUEST = 1;
public static final int TOM_READONLY_REQUEST = 1;
public static final int TOM_RECONFIG_REQUEST = 2;
private int quorumF; // f replicas
private int quorum2F; // f * 2 replicas
private int quorumStrong; // ((n + f) / 2) replicas
......
......@@ -8,7 +8,7 @@ package navigators.smart.reconfiguration;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.communication.SystemMessage;
/**
*
......
......@@ -265,24 +265,24 @@ public class TOMConfiguration extends Configuration {
s = (String) configs.remove("system.communication.inQueueSize");
if (s == null) {
inQueueSize = 200;
inQueueSize = 1000;
} else {
inQueueSize = Integer.parseInt(s);
if (inQueueSize < 1) {
inQueueSize = 200;
inQueueSize = 1000;
}
}
s = (String) configs.remove("system.communication.outQueueSize");
if (s == null) {
outQueueSize = 200;
outQueueSize = 1000;
} else {
outQueueSize = Integer.parseInt(s);
if (outQueueSize < 1) {
outQueueSize = 200;
outQueueSize = 1000;
}
}
......
......@@ -23,7 +23,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.communication.SystemMessage;
/**
......
......@@ -17,13 +17,15 @@
*/
package navigators.smart.tom;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.reconfiguration.ReconfigureReply;
......@@ -44,22 +46,17 @@ public class ServiceProxy extends TOMSender {
// Locks for send requests and receive replies
private ReentrantLock canReceiveLock = new ReentrantLock();
private ReentrantLock canSendLock = new ReentrantLock();
private Semaphore sm = new Semaphore(0);
private int reqId = -1; // request id
private int replyQuorum = 0; // size of the reply quorum
private TOMMessage replies[] = null; // Replies from replicas are stored here
private int receivedReplies = 0; // Number of received replies
private TOMMessage response = null; // Reply delivered to the application
private LinkedList<TOMMessage> aheadOfTimeReplies = new LinkedList<TOMMessage>();
//private LinkedList<TOMMessage> aheadOfTimeReplies = new LinkedList<TOMMessage>();
private int invokeTimeout = 60;
private Comparator comparator;
private Extractor extractor;
/**
* Constructor
*
......@@ -89,7 +86,7 @@ public class ServiceProxy extends TOMSender {
* quorum of replies
*/
public ServiceProxy(int processId, String configHome,
Comparator replyComparator, Extractor replyExtractor) {
Comparator replyComparator, Extractor replyExtractor) {
if (configHome == null) {
init(processId);
} else {
......@@ -98,14 +95,16 @@ public class ServiceProxy extends TOMSender {
replies = new TOMMessage[getViewManager().getCurrentViewN()];
comparator = (replyComparator != null)? replyComparator : new Comparator<byte[]>() {
comparator = (replyComparator != null) ? replyComparator : new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
return Arrays.equals(o1, o2) ? 0 : -1;
}
};
extractor = (replyExtractor != null)? replyExtractor : new Extractor() {
extractor = (replyExtractor != null) ? replyExtractor : new Extractor() {
@Override
public TOMMessage extractResponse(TOMMessage[] replies, int sameContent, int lastReceived) {
return replies[lastReceived];
......@@ -113,7 +112,6 @@ public class ServiceProxy extends TOMSender {
};
}
/**
* Get the amount of time (in seconds) that this proxy will wait for
* servers replies before returning null.
......@@ -142,11 +140,13 @@ public class ServiceProxy extends TOMSender {
* @return The reply from the replicas related to request
*/
public byte[] invoke(byte[] request) {
return invoke(request, ReconfigurationManager.TOM_NORMAL_REQUEST, false);
return invoke(request, ReconfigurationManager.TOM_NORMAL_REQUEST);
}
public byte[] invoke(byte[] request, boolean readOnly) {
return invoke(request, ReconfigurationManager.TOM_NORMAL_REQUEST, readOnly);
int type = (readOnly) ? ReconfigurationManager.TOM_READONLY_REQUEST
: ReconfigurationManager.TOM_NORMAL_REQUEST;
return invoke(request, type);
}
/**
......@@ -157,10 +157,9 @@ public class ServiceProxy extends TOMSender {
* @param request Request to be sent
* @param reqType TOM_NORMAL_REQUESTS for service requests, and other for
* reconfig requests.
* @param readOnly it is a read only request (will not be ordered)
* @return The reply from the replicas related to request
*/
public byte[] invoke(byte[] request, int reqType, boolean readOnly) {
public byte[] invoke(byte[] request, int reqType) {
canSendLock.lock();
// Clean all statefull data to prepare for receiving next replies
......@@ -168,30 +167,32 @@ public class ServiceProxy extends TOMSender {
receivedReplies = 0;
response = null;
replyQuorum = (int) Math.ceil((getViewManager().getCurrentViewN() +
getViewManager().getCurrentViewF()) / 2) + 1;
replyQuorum = (int) Math.ceil((getViewManager().getCurrentViewN()
+ getViewManager().getCurrentViewF()) / 2) + 1;
// Send the request to the replicas, and get its ID
reqId = generateRequestId();
TOMulticast(request, reqId, reqType, readOnly);
TOMulticast(request, reqId, reqType);
Logger.println("Sending request (readOnly = "+readOnly+") with reqId="+reqId);
Logger.println("Expected number of matching replies: "+replyQuorum);
Logger.println("Sending request (readOnly = "
+ (reqType == ReconfigurationManager.TOM_READONLY_REQUEST)
+ ") with reqId=" + reqId);
Logger.println("Expected number of matching replies: " + replyQuorum);
// This instruction blocks the thread, until a response is obtained.
// The thread will be unblocked when the method replyReceived is invoked
// by the client side communication system
try {
if(!this.sm.tryAcquire(invokeTimeout, TimeUnit.SECONDS)) {
if (!this.sm.tryAcquire(invokeTimeout, TimeUnit.SECONDS)) {
Logger.println("###################TIMEOUT#######################");
Logger.println("Reply timeout for reqId="+reqId);
Logger.println("Reply timeout for reqId=" + reqId);
canSendLock.unlock();
return null;
}
} catch (InterruptedException ex) {
}
Logger.println("Response extracted = "+response);
Logger.println("Response extracted = " + response);
byte[] ret = null;
......@@ -201,10 +202,10 @@ public class ServiceProxy extends TOMSender {
Logger.println("Received n-f replies and no response could be extracted.");
canSendLock.unlock();
if(readOnly) {
if (reqType == ReconfigurationManager.TOM_READONLY_REQUEST) {
//invoke the operation again, whitout the read-only flag
Logger.println("###################RETRY#######################");
return invoke(request, reqType, false);
return invoke(request);
} else {
throw new RuntimeException("Received n-f replies without f+1 of them matching.");
}
......@@ -220,7 +221,7 @@ public class ServiceProxy extends TOMSender {
reconfigureTo((View) TOMUtil.getObject(response.getContent()));
canSendLock.unlock();
return invoke(request, reqType, readOnly);
return invoke(request, reqType);
}
} else {
//Reply to a reconfigure request!
......@@ -232,7 +233,7 @@ public class ServiceProxy extends TOMSender {
reconfigureTo((View) r);
canSendLock.unlock();
return invoke(request, reqType, readOnly);
return invoke(request, reqType);
} else { //reconfiguration executed!
reconfigureTo(((ReconfigureReply) r).getView());
ret = response.getContent();
......@@ -252,7 +253,7 @@ public class ServiceProxy extends TOMSender {
//******* EDUARDO BEGIN **************//
private void reconfigureTo(View v) {
Logger.println("Installing a most up-to-date view with id="+v.getId());
Logger.println("Installing a most up-to-date view with id=" + v.getId());
getViewManager().reconfigureTo(v);
replies = new TOMMessage[getViewManager().getCurrentViewN()];
getCommunicationSystem().updateConnections();
......@@ -267,8 +268,8 @@ public class ServiceProxy extends TOMSender {
@Override
public void replyReceived(TOMMessage reply) {
canReceiveLock.lock();
if(reqId==-1){//no message being expected
Logger.println("throwing out request: sender="+reply.getSender()+" reqId="+reply.getSequence());
if (reqId == -1) {//no message being expected
Logger.println("throwing out request: sender=" + reply.getSender() + " reqId=" + reply.getSequence());
canReceiveLock.unlock();
return;
}
......@@ -281,28 +282,38 @@ public class ServiceProxy extends TOMSender {
}
//******* EDUARDO END **************//
/*
if (reply.getSequence() > reqId) { // Is this a reply for the last request sent?
Logger.println("Storing reply from "+reply.getSender()+" with reqId:"+reply.getSequence());
aheadOfTimeReplies.add(reply);
} else if(reply.getSequence() == reqId) {
Logger.println("Receiving reply from "+reply.getSender()+" with reqId:"+reply.getSequence()+". Putting on pos="+pos);
Logger.println("Storing reply from "+reply.getSender()+" with reqId:"+reply.getSequence());
aheadOfTimeReplies.add(reply);
} else
*/
if (reply.getSequence() == reqId) {
Logger.println("Receiving reply from " + reply.getSender() +
" with reqId:" + reply.getSequence() + ". Putting on pos=" + pos);
/*
if(receivedReplies == 0) {
//If this is the first reply received for reqId, lets look at ahead
//of time messages to process possible messages for this reqId that
//were already received
for(ListIterator<TOMMessage> li = aheadOfTimeReplies.listIterator(); li.hasNext(); ) {
TOMMessage rr = li.next();
if(rr.getSequence() == reqId) {
int rpos = getViewManager().getCurrentViewPos(rr.getSender());
if(replies[rpos] == null) receivedReplies++;
replies[rpos] = rr;
li.remove();
}
}
//If this is the first reply received for reqId, lets look at ahead
//of time messages to process possible messages for this reqId that
//were already received
for(ListIterator<TOMMessage> li = aheadOfTimeReplies.listIterator(); li.hasNext(); ) {
TOMMessage rr = li.next();
if(rr.getSequence() == reqId) {
int rpos = getViewManager().getCurrentViewPos(rr.getSender());
if(replies[rpos] == null) receivedReplies++;
replies[rpos] = rr;
li.remove();
}
}
}
*/
if(replies[pos] == null) receivedReplies++;
if (replies[pos] == null) {
receivedReplies++;
}
replies[pos] = reply;
// Compare the reply just received, to the others
......@@ -310,7 +321,7 @@ public class ServiceProxy extends TOMSender {
for (int i = 0; i < replies.length; i++) {
if (i != pos && replies[i] != null && (comparator.compare(replies[i].getContent(), reply.getContent()) == 0)) {
sameContent++;
if (sameContent >= replyQuorum) {
response = extractor.extractResponse(replies, sameContent, pos);
reqId = -1;
......@@ -320,8 +331,8 @@ public class ServiceProxy extends TOMSender {
}
}
if (response == null &&
receivedReplies >= getViewManager().getCurrentViewN() - getViewManager().getCurrentViewF()) {
if (response == null
&& receivedReplies >= getViewManager().getCurrentViewN() - getViewManager().getCurrentViewF()) {
//it's not safe to wait for more replies (n-f replies received),
//but there is no response available...
reqId = -1;
......@@ -332,5 +343,4 @@ public class ServiceProxy extends TOMSender {
// Critical section ends here. The semaphore can be released
canReceiveLock.unlock();
}
}
......@@ -49,9 +49,10 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
private boolean isToJoin = false;
private ReentrantLock waitTTPJoinMsgLock = new ReentrantLock();
private Condition canProceed = waitTTPJoinMsgLock.newCondition();
//private byte[] startState = null;
/** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
private ReentrantLock requestsLock = new ReentrantLock();
private Condition requestsCondition = requestsLock.newCondition();
private boolean isQueueEmpty = true;
/*******************************************************/
......@@ -173,10 +174,10 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
private void initReplica() {
// Initialize messages queue received from the TOM layer
this.requestQueue = new LinkedBlockingQueue<TOMMessage>();
requestQueue = new LinkedBlockingQueue<TOMMessage>();
this.replicaThread = new Thread(this);
this.replicaThread.start(); // starts the replica
replicaThread = new Thread(this);
replicaThread.start(); // starts the replica
}
//******* EDUARDO END **************//
......@@ -190,13 +191,11 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
setState(this.startState);
this.startState = null;
}*/
/**ISTO E CODIGO DO JOAO, PARA TENTAR RESOLVER UM BUG */
cs.start();
/******************************************************/
//******* EDUARDO END **************//
cs.start(); //this is not good! we have to reengineer the initialization code
while (true) {
TOMMessage msg = null;
......@@ -205,11 +204,12 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
} catch (InterruptedException ex) {
continue;
}
msg.requestTotalLatency = System.currentTimeMillis() - msg.consensusStartTime;
// Deliver the message to the application, and get the response
// Deliver the message to the application, and get the response
byte[] response = executeCommand(msg.getSender(), msg.timestamp,
msg.nonces, msg.getContent(), msg.isReadOnlyRequest(), msg.getDebugInfo());
msg.nonces, msg.getContent(),
msg.getReqType() == ReconfigurationManager.TOM_READONLY_REQUEST,
msg.getDebugInfo());
/** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
requestsLock.lock();
......@@ -270,8 +270,6 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
}
/** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
private ReentrantLock requestsLock = new ReentrantLock();
private Condition requestsCondition = requestsLock.newCondition();
@Override
public byte[] getState() { //TODO: Ha por aqui uma condicao de corrida!
......@@ -314,7 +312,6 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
Logger.getLogger(ServiceReplica.class.getName()).log(Level.SEVERE, null, ex);
}
}
requestsLock.unlock();
}
......
......@@ -21,7 +21,6 @@ package navigators.smart.tom;
import navigators.smart.communication.ServerCommunicationSystem;
import navigators.smart.paxosatwar.executionmanager.ExecutionManager;
import navigators.smart.paxosatwar.executionmanager.LeaderModule;
import navigators.smart.paxosatwar.executionmanager.ProofVerifier;
import navigators.smart.paxosatwar.messages.MessageFactory;
import navigators.smart.paxosatwar.roles.Acceptor;
import navigators.smart.paxosatwar.roles.Proposer;
......@@ -51,7 +50,7 @@ public abstract class TOMReceiver implements TOMRequestReceiver {
*/
//public void init(ServerCommunicationSystem cs, TOMConfiguration conf) {
public void init(ServerCommunicationSystem cs, ReconfigurationManager reconfManager,
int lastExec, int lastLeader) {
int lastExec, int lastLeader) {
if (tomStackCreated) { // if this object was already initialized, don't do it again
return;
}
......@@ -76,13 +75,11 @@ public abstract class TOMReceiver implements TOMRequestReceiver {
MessageFactory messageFactory = new MessageFactory(me);
ProofVerifier proofVerifier = new ProofVerifier(reconfManager);
LeaderModule lm = new LeaderModule(reconfManager);
Acceptor acceptor = new Acceptor(cs, messageFactory, proofVerifier, lm, reconfManager);
Proposer proposer = new Proposer(cs, messageFactory, proofVerifier, reconfManager);
Acceptor acceptor = new Acceptor(cs, messageFactory, lm, reconfManager);
Proposer proposer = new Proposer(cs, messageFactory, reconfManager);
ExecutionManager manager = new ExecutionManager(reconfManager, acceptor, proposer,
me, reconfManager.getStaticConf().getFreezeInitialTimeout());
......@@ -90,11 +87,8 @@ public abstract class TOMReceiver implements TOMRequestReceiver {
acceptor.setManager(manager);
proposer.setManager(manager);
TOMLayer tomLayer = new TOMLayer(manager, this, lm, acceptor, cs, reconfManager, proofVerifier);
TOMLayer tomLayer = new TOMLayer(manager, this, lm, acceptor, cs, reconfManager);
//tomLayer.setLastExec(lastExec);
//tomLayer.lm.decided(lastExec, lastLeader);
manager.setTOMLayer(tomLayer);
//******* EDUARDO BEGIN **************//
......
......@@ -154,8 +154,7 @@ public abstract class TOMSender implements ReplyReceiver {
* @param m Data to be multicast
*/
public void TOMulticast(TOMMessage sm) {
cs.send(useSignatures, this.viewManager.getCurrentViewProcesses(),
sm, false);
cs.send(useSignatures, this.viewManager.getCurrentViewProcesses(), sm);
}
/**
......@@ -163,13 +162,12 @@ public abstract class TOMSender implements ReplyReceiver {
*
* @param m Data to be multicast
* @param reqId unique integer that identifies this request
* @param reqType TOM_NORMAL or TOM_RECONFIGURATION
* @param readOnly it is a readonly request
* @param reqType TOM_NORMAL, TOM_READONLY or TOM_RECONFIGURATION
*/
public void TOMulticast(byte[] m, int reqId, int reqType, boolean readOnly) {
public void TOMulticast(byte[] m, int reqId, int reqType) {
cs.send(useSignatures, viewManager.getCurrentViewProcesses(),
new TOMMessage(me, session, reqId, m, viewManager.getCurrentViewId(),
reqType, readOnly), false);
reqType));
}
/**
......
......@@ -28,6 +28,7 @@ import navigators.smart.statemanagment.TransferableState;
import navigators.smart.tom.TOMRequestReceiver;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.BatchReader;
import navigators.smart.tom.util.DebugInfo;
import navigators.smart.tom.util.Logger;
import navigators.smart.tom.util.TOMUtil;
......@@ -39,7 +40,6 @@ public class DeliveryThread extends Thread {
private LinkedBlockingQueue<Consensus> decided = new LinkedBlockingQueue<Consensus>(); // decided consensus
private TOMLayer tomLayer; // TOM layer
private RequestRecover requestRecover; // TODO: isto ainda vai ser usado?
private TOMRequestReceiver receiver; // Object that receives requests from clients
private ReconfigurationManager manager;
......@@ -56,7 +56,6 @@ public class DeliveryThread extends Thread {
this.receiver = receiver;
//******* EDUARDO BEGIN **************//
this.manager = manager;
this.requestRecover = new RequestRecover(tomLayer, manager);
//******* EDUARDO END **************//
}
......@@ -136,7 +135,7 @@ public class DeliveryThread extends Thread {
/******* Deixo isto comentado, pois nao me parece necessario **********/
/******* Alem disso, esta informacao nao vem no TransferableState **********
requests[i].requestTotalLatency = System.currentTimeMillis()-cons.startTime;
requests[i].requestTotalLatency = System.nanoTime()-cons.startTime;
/***************************************************************************/
//receiver.receiveOrderedMessage(requests[i]);
......@@ -190,23 +189,6 @@ public class DeliveryThread extends Thread {
}
//******* EDUARDO END: Acho que precisa mudar aqui, como na entrega normal **************//
/****** Julgo que isto nao sera necessario ***********
if (conf.getCheckpoint_period() > 0) {
if ((eid > 0) && (eid % conf.getCheckpoint_period() == 0)) {
Logger.println("(DeliveryThread.update) Performing checkpoint for consensus " + eid);
byte[] state2 = receiver.getState();
tomLayer.saveState(state2, eid);
//TODO: possivelmente fazer mais alguma coisa
}
else {
Logger.println("(DeliveryThread.update) Storing message batch in the state log for consensus " + eid);
tomLayer.saveBatch(batch, eid);
//TODO: possivelmente fazer mais alguma coisa
}
}
*/
} catch (Exception e) {
e.printStackTrace(System.out);
}
......@@ -279,18 +261,11 @@ public class DeliveryThread extends Thread {
}
Logger.println("(DeliveryThread.run) A consensus was delivered.");
/******************************************************************/
startTime = System.currentTimeMillis();
startTime = System.nanoTime();
//System.out.println("vai entragar o consenso: "+cons.getId());
//TODO: avoid the case in which the received valid proposal is
//different from the decided value
//System.out.println("chegou aqui 1: "+cons.getId());
//System.out.println("(TESTE // DeliveryThread.run) EID: " + cons.getId() + ", round: " + cons.getDecisionRound() + ", value: " + cons.getDecision().length);
TOMMessage[] requests = (TOMMessage[]) cons.getDeserializedDecision();
......@@ -315,21 +290,17 @@ public class DeliveryThread extends Thread {
}
}
//System.out.println("chegou aqui 4: "+cons.getId());
tomLayer.clientsManager.getClientsLock().lock();
//System.out.println("chegou aqui 5: "+cons.getId());
for (int i = 0; i < requests.length; i++) {
if(requests[0].equals(cons.firstMessageProposed))
requests[0] = cons.firstMessageProposed;
for (int i = 0; i < requests.length; i++) {
/** ISTO E CODIGO DO JOAO, PARA TRATAR DE DEBUGGING */
// if (Logger.debug)
// requests[i].setSequence(new DebugInfo(cons.getId(), cons.getDecisionRound(), lm.getLeader(cons.getId(), cons.getDecisionRound())));
requests[i].setDebugInfo(new DebugInfo(cons.getId(),0,0,requests[i]));
/****************************************************/
requests[i].consensusStartTime = cons.startTime;
requests[i].consensusExecutionTime = cons.executionTime;
requests[i].consensusBatchSize = cons.batchSize;
//System.out.println("chegou aqui 6: "+cons.getId());
tomLayer.clientsManager.requestOrdered(requests[i]);
//System.out.println("chegou aqui 7: "+cons.getId());
......@@ -342,14 +313,12 @@ public class DeliveryThread extends Thread {
//define the last stable consensus... the stable consensus can
//be removed from the leaderManager and the executionManager
/**/
if (cons.getId() > 2) {
int stableConsensus = cons.getId() - 3;
tomLayer.lm.removeStableConsenusInfos(stableConsensus);
tomLayer.execManager.removeExecution(stableConsensus);
}
/**/
//verify if there is a next proposal to be executed
......@@ -362,16 +331,16 @@ public class DeliveryThread extends Thread {
//deliver the request to the application (receiver)
for (int i = 0; i < requests.length; i++) {
requests[i].requestTotalLatency = System.currentTimeMillis() - cons.startTime;
requests[i].deliveryTime = System.nanoTime();
//******* EDUARDO BEGIN **************//
if (requests[i].getViewID() == this.manager.getCurrentViewId()) {
if (requests[i].getReqType() == ReconfigurationManager.TOM_NORMAL_REQUEST) {
//normal request execution
receiver.receiveOrderedMessage(requests[i]);
} else {
//Reconfiguration request processing!
this.manager.enqueueUpdate(requests[i]);
}
} else {
this.tomLayer.getCommunication().send(new int[]{requests[i].getSender()},
......@@ -379,8 +348,7 @@ public class DeliveryThread extends Thread {
requests[i].getSession(), requests[i].getSequence(),
TOMUtil.getBytes(this.manager.getCurrentView()), this.manager.getCurrentViewId()));
}
//******* EDUARDO END **************//
//******* EDUARDO END **************//
}
......@@ -433,7 +401,7 @@ public class DeliveryThread extends Thread {
}
}
/********************************************************/
Logger.println("(DeliveryThread.run) All finished for " + cons.getId() + ", took " + (System.currentTimeMillis() - startTime));
Logger.println("(DeliveryThread.run) All finished for " + cons.getId() + ", took " + (System.nanoTime() - startTime));
} catch (Exception e) {
e.printStackTrace(System.out);
}
......
......@@ -82,7 +82,7 @@ public class OutOfContextMessageThread extends Thread {
/******************************************************************/
int nextExecution = tomLayer.getLastExec() + 1;
if (tomLayer.execManager.thereArePendentMessages(nextExecution)) {
if (tomLayer.execManager.thereArePendingMessages(nextExecution)) {
Logger.println("(OutOfContextMessageThread.run) starting processing out of context messages for consensus " + nextExecution);
execution = tomLayer.execManager.getExecution(nextExecution);
......
......@@ -18,6 +18,7 @@
package navigators.smart.tom.core.messages;
import navigators.smart.communication.SystemMessage;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
......
......@@ -18,6 +18,7 @@
package navigators.smart.tom.core.messages;
import navigators.smart.communication.SystemMessage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
......@@ -43,8 +44,8 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
private int session; // Sequence number defined by the client
private int sequence; // Sequence number defined by the client
private byte[] content = null; // Content of the message
private boolean readOnlyRequest = false; //this is a read only request
//the fields bellow are not serialized!!!
private transient int id; // ID for this message. It should be unique
......@@ -52,10 +53,8 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
public transient long timestamp = 0; // timestamp to be used by the application
public transient byte[] nonces = null; // nonces to be used by the applciation
//Esses dois Campos servem pra que?
public transient int destination = -1; // message destination
public transient boolean signed = false; // is this message signed?
public transient boolean includesClassHeader = false; //are class header serialized
public transient long receptionTime;//the reception time of this message
public transient boolean timeout = false;//this message was timed out?
......@@ -66,11 +65,14 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
public transient byte[] serializedMessageMAC = null;
//for benchmarking purposes
public transient long consensusStartTime=0;
public transient long consensusExecutionTime=0;
public transient int consensusBatchSize=0;
public transient long requestTotalLatency=0;
public transient long consensusStartTime = 0; //time the consensus is created
public transient long proposeReceivedTime = 0; //time the propose is received
public transient long weakSentTime = 0; //time the replica' weak message is sent
public transient long strongSentTime = 0; //time the replica' strong message is sent
public transient long decisionTime = 0; //time the decision is established
public transient long deliveryTime =0; //time the request is delivered
public transient long executedTime =0; //time the request is executed
//the reply associated with this message
public transient TOMMessage reply = null;
......@@ -81,24 +83,26 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
* Creates a new instance of TOMMessage
*
* @param sender ID of the process which sent the message
* @param session Session id of the sender
* @param sequence Sequence number defined by the client
* @param content Content of the message
* @param view ViewId of the message
*/
public TOMMessage(int sender, int session, int sequence, byte[] content, int view) {
this(sender,session,sequence,content, view, ReconfigurationManager.TOM_NORMAL_REQUEST, false);
this(sender,session,sequence,content, view, ReconfigurationManager.TOM_NORMAL_REQUEST);
}
/**
* Creates a new instance of TOMMessage
*
* @param sender ID of the process which sent the message
* @param session Session id of the sender
* @param sequence Sequence number defined by the client
* @param content Content of the message
* @param view ViewId of the message
* @param type Type of the request
* @param readOnlyRequest it is a read only request
*/
public TOMMessage(int sender, int session, int sequence, byte[] content, int view, int type, boolean readOnlyRequest) {
public TOMMessage(int sender, int session, int sequence, byte[] content, int view, int type) {
super(sender);
this.session = session;
this.sequence = sequence;
......@@ -106,7 +110,6 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
buildId();
this.content = content;
this.reqType = type;
this.readOnlyRequest = readOnlyRequest;
}
......@@ -172,13 +175,6 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
return content;
}
/**
* @return the readOnlyRequest
*/
public boolean isReadOnlyRequest() {
return readOnlyRequest;
}
/**
* Verifies if two TOMMessage objects are equal.
*
......@@ -216,40 +212,6 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
return "(" + sender + "," + sequence + ")";
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(viewID);
out.writeInt(reqType);
out.writeInt(session);
out.writeInt(sequence);
if (content == null) {
out.writeInt(-1);
} else {
out.writeInt(content.length);
out.write(content);
}
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
viewID = in.readInt();
reqType = in.readInt();
session = in.readInt();
sequence = in.readInt();
int toRead = in.readInt();
if (toRead != -1) {
content = new byte[toRead];
//in.readFully(content);
do {
toRead -= in.read(content, content.length - toRead, toRead);
} while (toRead > 0);
}
buildId();
}
public void wExternal(DataOutput out) throws IOException {
out.writeInt(sender);
out.writeInt(viewID);
......@@ -263,8 +225,6 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
out.writeInt(content.length);
out.write(content);
}
out.writeBoolean(isReadOnlyRequest());
}
public void rExternal(DataInput in) throws IOException, ClassNotFoundException {
......@@ -277,12 +237,9 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
int toRead = in.readInt();
if (toRead != -1) {
content = new byte[toRead];
in.readFully(content);
}
readOnlyRequest = in.readBoolean();
buildId();
}
......
......@@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.communication.SystemMessage;
import navigators.smart.tom.core.messages.TOMMessage;
......
......@@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import navigators.smart.tom.core.messages.SystemMessage;
import navigators.smart.communication.SystemMessage;
/**
......
/**
* Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
*
* This file is part of SMaRt.
*
* SMaRt is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* SMaRt is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.tom.demo;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.util.DebugInfo;
/**
* Example replica that implements a BFT replicated service (a counter).
*
*/
public class CounterServer extends ServiceReplica {
private int counter = 0;
private int iterations = 0;
public CounterServer(int id) {
super(id);
}
//******* EDUARDO BEGIN **************//
public CounterServer(int id, boolean join) {
super(id,join);
}
//******* EDUARDO END **************//
@Override
public byte[] executeCommand(int clientId, long timestamp, byte[] nonces, byte[] command, boolean readOnly, DebugInfo info) {
//System.out.println("Valor de counter execute "+this.counter);
iterations++;
try {
int increment = new DataInputStream(new ByteArrayInputStream(command)).readInt();
System.out.println("read-only request: "+readOnly);
counter += increment;
if (info == null) System.out.println("[server] (" + iterations + ") Counter incremented: " + counter);
else System.out.println("[server] (" + iterations + " / " + info.eid + ") Counter incremented: " + counter);
ByteArrayOutputStream out = new ByteArrayOutputStream(4);
new DataOutputStream(out).writeInt(counter);
return out.toByteArray();
} catch (IOException ex) {
Logger.getLogger(CounterServer.class.getName()).log(Level.SEVERE, null, ex);
return null;
}
}
public static void main(String[] args){
if(args.length < 1) {
System.out.println("Use: java CounterServer <processId> <join option (optional)>");
System.exit(-1);
}
if(args.length > 1) {
new CounterServer(Integer.parseInt(args[0]), Boolean.valueOf(args[1]));
}else{
new CounterServer(Integer.parseInt(args[0]));
}
}
/** ISTO E CODIGO DO JOAO, PARA TRATAR DOS CHECKPOINTS */
@Override
protected byte[] serializeState() {
//System.out.println("vai ler counter para: "+this.counter);
byte[] b = new byte[4];
for (int i = 0; i < 4; i++) {
int offset = (b.length - 1 - i) * 8;
b[i] = (byte) ((counter >>> offset) & 0xFF);
}
return b;
//throw new UnsupportedOperationException("Not supported yet.");
}
@Override
protected void deserializeState(byte[] state) {
int value = 0;
for (int i = 0; i < 4; i++) {
int shift = (4 - 1 - i) * 8;
value += (state[i] & 0x000000FF) << shift;
}
//System.out.println("vai setar counter para: "+value);
this.counter = value;
// System.out.println("Valor de counter deserializeState "+this.counter);
}
/********************************************************/
}
......@@ -91,7 +91,8 @@ public class LatencyTestClient extends TOMSender {
last_send_instant = System.nanoTime();
this.TOMulticast(command, generateRequestId(),
ReconfigurationManager.TOM_NORMAL_REQUEST, readOnly);
(readOnly)?ReconfigurationManager.TOM_READONLY_REQUEST:
ReconfigurationManager.TOM_NORMAL_REQUEST);
this.sm.acquire();
......
......@@ -28,8 +28,6 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.util.DebugInfo;
import java.util.Scanner;
......@@ -101,7 +99,6 @@ public class RandomServer extends ServiceReplica {
new DataOutputStream(out).writeInt(value);
return out.toByteArray();
} catch (IOException ex) {
Logger.getLogger(CounterServer.class.getName()).log(Level.SEVERE, null, ex);
return null;
}
}
......
......@@ -134,8 +134,8 @@ public class ThroughputLatencyTestServer extends TOMReceiver {
//do throughput calculations
numDecides++;
//consensusLatencySt.store(msg.consensusExecutionTime);
totalLatencySt1.store(msg.requestTotalLatency);
batchSt1.store(msg.consensusBatchSize);
totalLatencySt1.store(msg.deliveryTime - msg.receptionTime);
//batchSt1.store(msg.consensusBatchSize);
if (numDecides == 1) {
lastDecideTimeInstant = receiveInstant;
......
......@@ -15,78 +15,70 @@
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.tom.demo;
package navigators.smart.tom.demo.microbenchmarks;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import navigators.smart.tom.ServiceProxy;
import navigators.smart.tom.util.Storage;
/**
* Example client that updates a BFT replicated service (a counter).
*
*/
public class CounterClient {
public class LatencyClient {
@SuppressWarnings("static-access")
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage: java CounterClient <process id> <increment>");
System.out.println(" if <increment> equals 0 the request will be read-only");
if (args.length < 5) {
System.out.println("Usage: java ...LatencyClient <process id> <number of operations> <request size> <interval> <read only?>");
System.exit(-1);
}
ServiceProxy counterProxy = new ServiceProxy(Integer.parseInt(args[0]));
//counterProxy.setInvokeTimeout(1);
int i = 0;
int inc = Integer.parseInt(args[1]);
//sends 1000 requests to replicas and then terminates
try {
//******* EDUARDO BEGIN **************//
boolean wait = true;
int numberOfOps = Integer.parseInt(args[1]);
int requestSize = Integer.parseInt(args[2]);
int interval = Integer.parseInt(args[3]);
boolean readOnly = Boolean.parseBoolean(args[4]);
BufferedReader inReader = new BufferedReader(new InputStreamReader(System.in));
while (i < 1000) {
if (wait) {
System.out.println("Press Enter...");
byte[] request = new byte[requestSize], reply;
String lido = inReader.readLine();
System.out.println("Warm up...");
if (lido.equals("exit")) {
counterProxy.close();
break;
}else if(lido.equals("go")) {
wait = false;
}
for (int i = 0; i < numberOfOps/2; i++) {
reply = counterProxy.invoke(request, readOnly);
}
Storage st = new Storage(numberOfOps/2);
/*try {
Thread.currentThread().sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}*/
System.out.println("Executing experiment for "+numberOfOps/2+" ops");
//******* EDUARDO END **************//
for (int i = 0; i < numberOfOps/2; i++) {
long last_send_instant = System.nanoTime();
reply = counterProxy.invoke(request, readOnly);
st.store(System.nanoTime() - last_send_instant);
ByteArrayOutputStream out = new ByteArrayOutputStream(4);
new DataOutputStream(out).writeInt(inc);
if (interval > 0) {
//sleeps interval ms before sending next request
Thread.sleep(interval);
}
}
System.out.println("Average time for " + numberOfOps / 2 + " executions (-10%) = " + st.getAverage(true) / 1000 + " us ");
System.out.println("Standard desviation for " + numberOfOps / 2 + " executions (-10%) = " + st.getDP(true) / 1000 + " us ");
System.out.println("Average time for " + numberOfOps / 2 + " executions (all samples) = " + st.getAverage(false) / 1000 + " us ");
System.out.println("Standard desviation for " + numberOfOps / 2 + " executions (all samples) = " + st.getDP(false) / 1000 + " us ");
System.out.println("Maximum time for " + numberOfOps / 2 + " executions (all samples) = " + st.getMax(false) / 1000 + " us ");
System.out.println("Counter sending: " + i);
byte[] reply = counterProxy.invoke(out.toByteArray(), (inc == 0));
int newValue = new DataInputStream(new ByteArrayInputStream(reply)).readInt();
System.out.println("Counter value: " + newValue);
i++;
} catch(Exception e){
} finally {
counterProxy.close();
}
//System.exit(0);
System.exit(0);
}
}
/**
* Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
*
* This file is part of SMaRt.
*
* SMaRt is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* SMaRt is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with SMaRt. If not, see <http://www.gnu.org/licenses/>.
*/
package navigators.smart.tom.demo.microbenchmarks;
import navigators.smart.tom.ServiceReplica;
import navigators.smart.tom.util.DebugInfo;
import navigators.smart.tom.util.Storage;
/**
* Simple server that just acknowledge the reception of a request.
*/
public class LatencyServer extends ServiceReplica {
private int interval;
private int hashs;
private int replySize;
private int iterations = 0;
private Storage totalLatency = null;
private Storage consensusLatency = null;
private Storage preConsLatency = null;
private Storage posConsLatency = null;
private Storage proposeLatency = null;
private Storage weakLatency = null;
private Storage strongLatency = null;
public LatencyServer(int id, int interval, int hashs, int replySize) {
super(id);
this.interval = interval;
this.hashs = hashs;
this.replySize = replySize;
totalLatency = new Storage(interval);
consensusLatency = new Storage(interval);
preConsLatency = new Storage(interval);
posConsLatency = new Storage(interval);
proposeLatency = new Storage(interval);
weakLatency = new Storage(interval);
strongLatency = new Storage(interval);
}
@Override
public byte[] executeCommand(int clientId, long timestamp, byte[] nonces, byte[] command, boolean readOnly, DebugInfo info) {
iterations++;
if(info == null) {
return new byte[replySize];
}
info.msg.executedTime = System.nanoTime();
/*
System.out.println("message received at "+info.msg.receptionTime/1000);
System.out.println("consensus started at "+info.msg.consensusStartTime/1000);
System.out.println("propose received at "+info.msg.proposeReceivedTime/1000);
System.out.println("weak sent at "+info.msg.weakSentTime/1000);
System.out.println("strong sent at "+info.msg.strongSentTime/1000);
System.out.println("decided at "+info.msg.decisionTime/1000);
System.out.println("delivered at "+info.msg.deliveryTime/1000);
System.out.println("executed at "+info.msg.executedTime/1000);
*/
totalLatency.store(info.msg.executedTime - info.msg.receptionTime);
consensusLatency.store(info.msg.decisionTime - info.msg.consensusStartTime);
preConsLatency.store(info.msg.consensusStartTime - info.msg.receptionTime);
posConsLatency.store(info.msg.executedTime - info.msg.decisionTime);
proposeLatency.store(info.msg.weakSentTime - info.msg.consensusStartTime);
weakLatency.store(info.msg.strongSentTime - info.msg.weakSentTime);
strongLatency.store(info.msg.decisionTime - info.msg.strongSentTime);
if(iterations % interval == 0) {
System.out.println("------------------------------------------------");
System.out.println("Total latency (" + interval + " executions) = " + totalLatency.getAverage(false) / 1000 + " (+/- "+ (long)totalLatency.getDP(false) / 1000 +") us ");
totalLatency.reset();
System.out.println("Consensus latency (" + interval + " executions) = " + consensusLatency.getAverage(false) / 1000 + " (+/- "+ (long)consensusLatency.getDP(false) / 1000 +") us ");
consensusLatency.reset();
System.out.println("Pre-consensus latency (" + interval + " executions) = " + preConsLatency.getAverage(false) / 1000 + " (+/- "+ (long)preConsLatency.getDP(false) / 1000 +") us ");
preConsLatency.reset();
System.out.println("Pos-consensus latency (" + interval + " executions) = " + posConsLatency.getAverage(false) / 1000 + " (+/- "+ (long)posConsLatency.getDP(false) / 1000 +") us ");
posConsLatency.reset();
System.out.println("Propose latency (" + interval + " executions) = " + proposeLatency.getAverage(false) / 1000 + " (+/- "+ (long)proposeLatency.getDP(false) / 1000 +") us ");
proposeLatency.reset();
System.out.println("Weak latency (" + interval + " executions) = " + weakLatency.getAverage(false) / 1000 + " (+/- "+ (long)weakLatency.getDP(false) / 1000 +") us ");
weakLatency.reset();
System.out.println("Strong latency (" + interval + " executions) = " + strongLatency.getAverage(false) / 1000 + " (+/- "+ (long)strongLatency.getDP(false) / 1000 +") us ");
strongLatency.reset();
}
return new byte[replySize];
}
public static void main(String[] args){
if(args.length < 4) {
System.out.println("Use: java ...LatencyServer <processId> <measurement interval> <processing hashs> <reply size>");
System.exit(-1);
}
int processId = Integer.parseInt(args[0]);
int interval = Integer.parseInt(args[1]);
int hashs = Integer.parseInt(args[2]);
int replySize = Integer.parseInt(args[3]);
new LatencyServer(processId,interval,hashs,replySize);
}
@Override
protected byte[] serializeState() {
return new byte[0];
}
@Override
protected void deserializeState(byte[] state) {
}
}
......@@ -29,9 +29,9 @@ import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import navigators.smart.paxosatwar.executionmanager.ProofVerifier;
import navigators.smart.paxosatwar.executionmanager.RoundValuePair;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.core.messages.TOMMessage;
/**
......@@ -63,6 +63,7 @@ public class LCManager {
//stuff from the TOM layer that this object needss
private ReconfigurationManager reconfManager;
private MessageDigest md;
private TOMLayer tomLayer;
/**
* Constructor
......@@ -70,8 +71,8 @@ public class LCManager {
* @param reconfManager The reconfiguration manager from TOM layer
* @param md The message digest engine from TOM layer
*/
public LCManager(ReconfigurationManager reconfManager, MessageDigest md) {
public LCManager(TOMLayer tomLayer,ReconfigurationManager reconfManager, MessageDigest md) {
this.tomLayer = tomLayer;
this.lastts = 0;
this.nextts = 0;
......@@ -574,13 +575,13 @@ public class LCManager {
* @param eid the eid to which to normalize the collects
* @return a set of correctly signed and normalized collect data structures
*/
public HashSet<CollectData> selectCollects(int ts, int eid, ProofVerifier pv) {
public HashSet<CollectData> selectCollects(int ts, int eid) {
HashSet<SignedObject> c = collects.get(ts);
if (c == null) return null;
return normalizeCollects(getSignedCollects(c, pv), eid);
return normalizeCollects(getSignedCollects(c), eid);
}
......@@ -591,16 +592,16 @@ public class LCManager {
* @param eid the eid to which to normalize the collects
* @return a set of correctly signed and normalized collect data structures
*/
public HashSet<CollectData> selectCollects(HashSet<SignedObject> signedObjects, int eid, ProofVerifier pv) {
public HashSet<CollectData> selectCollects(HashSet<SignedObject> signedObjects, int eid) {
if (signedObjects == null) return null;
return normalizeCollects(getSignedCollects(signedObjects, pv), eid);
return normalizeCollects(getSignedCollects(signedObjects), eid);
}
// Filters the correctly signed collects
private HashSet<CollectData> getSignedCollects(HashSet<SignedObject> signedCollects, ProofVerifier pv) {
private HashSet<CollectData> getSignedCollects(HashSet<SignedObject> signedCollects) {
HashSet<CollectData> colls = new HashSet<CollectData>();
......@@ -608,11 +609,9 @@ public class LCManager {
CollectData c;
try {
c = (CollectData) so.getObject();
int sender = c.getPid();
if (pv.validSignature(so, sender)) {
if (tomLayer.verifySignature(so, sender)) {
colls.add(c);
}
} catch (IOException ex) {
......
......@@ -18,6 +18,7 @@
package navigators.smart.tom.leaderchange;
import navigators.smart.communication.SystemMessage;
import navigators.smart.tom.core.messages.*;
import java.io.IOException;
import java.io.ObjectInput;
......
......@@ -23,6 +23,8 @@
package navigators.smart.tom.util;
import navigators.smart.tom.core.messages.TOMMessage;
/**
*
* @author Joao Sousa
......@@ -32,12 +34,12 @@ public class DebugInfo {
public final int eid;
public final int round;
public final int leader;
public final TOMMessage msg;
public DebugInfo(int eid, int round, int leader) {
public DebugInfo(int eid, int round, int leader, TOMMessage msg) {
this.eid = eid;
this.round = round;
this.leader = leader;
this.msg = msg;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册