提交 3c3714d7 编写于 作者: B bessani@gmail.com

Include session on the messages... corrected the problem of clients block when...

Include session on the messages... corrected the problem of clients block when they ressurrect (so, now a client can run multiple times with the same id)
上级 765317c2
......@@ -32,6 +32,8 @@ public class ClientData {
private int clientId;
//private PublicKey publicKey = null;
private int session = -1;
private int lastMessageReceived = -1;
private long lastMessageReceivedTime = 0;
......@@ -56,6 +58,14 @@ public class ClientData {
return clientId;
}
public int getSession() {
return session;
}
public void setSession(int session) {
this.session = session;
}
public PendingRequests getPendingRequests() {
return pendingRequests;
}
......
......@@ -216,10 +216,6 @@ public class ClientsManager {
public boolean requestReceived(TOMMessage request, boolean fromClient, boolean storeMessage) {
request.receptionTime = System.currentTimeMillis();
int clientId = request.getSender();
boolean accounted = false;
......@@ -231,45 +227,43 @@ public class ClientsManager {
/******* BEGIN CLIENTDATA CRITICAL SECTION ******/
//Logger.println("(ClientsManager.requestReceived) lock for client "+clientData.getClientId()+" acquired");
/*
//for dealing with restarted clients
if ((request.getSequence() == 0) &&
(request.receptionTime - clientData.getLastMessageReceivedTime() >
conf.getReplyVerificationTime())) {
System.out.println("Start accounting messages for client "+clientId);
clientData.setLastMessageReceived(-1);
}
*/
/* ################################################ */
/* ################################################ */
//pjsousa: simple flow control mechanism to avoid out of memory exception
if (manager.getStaticConf().getUseControlFlow()!=0){
if (fromClient && (clientData.getPendingRequests().size()>manager.getStaticConf().getUseControlFlow())){
//clients should not have more than 1000 outstanding messages, otherwise they will be dropped
if (fromClient && (manager.getStaticConf().getUseControlFlow()!=0)){
if (clientData.getPendingRequests().size() > manager.getStaticConf().getUseControlFlow()){
//clients should not have more than defined in the config file
//outstanding messages, otherwise they will be dropped.
//just account for the message reception
clientData.setLastMessageReceived(request.getSequence());
clientData.setLastMessageReceivedTime(request.receptionTime);
clientData.clientLock.unlock();
accounted = false;
return accounted;
return false;
}
}
/* ################################################ */
//new session... just reset the client counter
if(clientData.getSession() != request.getSession()) {
clientData.setSession(request.getSession());
clientData.setLastMessageReceived(-1);
}
/* ################################################ */
if ((clientData.getLastMessageReceived()==-1) || (clientData.getLastMessageReceived() + 1 == request.getSequence()) || ((request.getSequence()>clientData.getLastMessageReceived()) && !fromClient)) {
if ((clientData.getLastMessageReceived()==-1) || //first message received or new session (see above)
(clientData.getLastMessageReceived() + 1 == request.getSequence()) || //message received is the expected
((request.getSequence()>clientData.getLastMessageReceived()) && !fromClient)) {
//it is a new message and I have to verify it's signature
if (!request.signed || clientData.verifySignature(
request.serializedMessage,
if (!request.signed ||
clientData.verifySignature(request.serializedMessage,
request.serializedMessageSignature)) {
//I don't have the message but it is correctly signed, I will
//I don't have the message but it is valid, I will
//insert it in the pending requests of this client
if(storeMessage) {
clientData.getPendingRequests().add(request);
/*
if (request.getSequence()%1000 == 0)
Logger.println("(ClientsManager.requestReceived) client "+clientId+ " pending requests size: "+clientData.getPendingRequests().size());
*/
}
clientData.setLastMessageReceived(request.getSequence());
clientData.setLastMessageReceivedTime(request.receptionTime);
......@@ -280,18 +274,13 @@ public class ClientsManager {
accounted = true;
}
} else {//I will not put this message on the pending requests list
} else {
//I will not put this message on the pending requests list
if (clientData.getLastMessageReceived() >= request.getSequence()) {
//I already have/had this message
accounted = true;
} else {
//it is an invalid message if it's being sent by a client (sequence number > last received + 1)
/*
Logger.println("Ignoring message "+request+" from client "+
clientData.getClientId()+"(last received = "+
clientData.getLastMessageReceived()+"), msg sent by client? "+fromClient);
**/
//a too forward message... the client must be malicious
accounted = false;
}
}
......
......@@ -66,7 +66,7 @@ public class Test {
if(id == 0) {
long time = System.nanoTime();
scl.send(targets, new TOMMessage(id,i,msg.getBytes(),0));
scl.send(targets, new TOMMessage(id,0,i,msg.getBytes(),0));
int rec = 0;
while(rec < n-1) {
......@@ -78,7 +78,7 @@ public class Test {
System.out.println("Roundtrip "+((System.nanoTime()-time)/1000.0)+" us");
} else {
TOMMessage m = (TOMMessage) inQueue.take();
scl.send(new int[]{m.getSender()}, new TOMMessage(id,i,m.getContent(),0));
scl.send(new int[]{m.getSender()}, new TOMMessage(id,0,i,m.getContent(),0));
}
}
......@@ -90,7 +90,7 @@ public class Test {
if(id == 0) {
long time = System.nanoTime();
scl.send(targets, new TOMMessage(id,i,msg.getBytes(),0));
scl.send(targets, new TOMMessage(id,0,i,msg.getBytes(),0));
int rec = 0;
while(rec < n-1) {
......@@ -101,7 +101,7 @@ public class Test {
st.store(System.nanoTime()-time);
} else {
TOMMessage m = (TOMMessage) inQueue.take();
scl.send(new int[]{m.getSender()}, new TOMMessage(id,i,m.getContent(),0));
scl.send(new int[]{m.getSender()}, new TOMMessage(id,0,i,m.getContent(),0));
}
}
......@@ -115,4 +115,3 @@ public class Test {
//scl.shutdown();
}
}
......@@ -33,7 +33,7 @@ public class TestSerialization {
*/
public static void main(String[] args) throws Exception {
// TODO code application logic here
TOMMessage tm = new TOMMessage(0,0,new String("abc").getBytes(),0);
TOMMessage tm = new TOMMessage(0,0,0,new String("abc").getBytes(),0);
ByteArrayOutputStream baos = new ByteArrayOutputStream(4);
DataOutputStream oos = new DataOutputStream(baos);
......
......@@ -18,9 +18,6 @@ public class View implements Serializable {
private int f;
private int[] processes;
public View(int id, int[] processes, int f){
this.id = id;
this.processes = processes;
......
......@@ -244,8 +244,10 @@ public class TOMConfiguration extends Configuration {
s = (String) configs.remove("system.initial.view");
if (s == null) {
//initialView = new int[4];
//initialView[0]=0;initialView[1]=1;initialView[2]=2;initialView[3]=3;
initialView = new int[n];
for(int i=0; i<n; i++) {
initialView[i] = i;
}
} else {
StringTokenizer str = new StringTokenizer(s,",");
initialView = new int[str.countTokens()];
......
......@@ -179,6 +179,7 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
/**
* This method runs the replica code
*/
@Override
public void run() {
//******* EDUARDO BEGIN **************//
if (this.startState != null) {
......@@ -213,8 +214,8 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
}
/********************************************************/
// send reply to the client
cs.send(new int[]{msg.getSender()}, new TOMMessage(id, msg.getSequence(),
response, this.reconfManager.getCurrentViewId()));
cs.send(new int[]{msg.getSender()}, new TOMMessage(id, msg.getSession(),
msg.getSequence(), response, this.reconfManager.getCurrentViewId()));
}
}
......@@ -223,6 +224,7 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
*
* @param msg The request delivered by the TOM layer
*/
@Override
public void receiveOrderedMessage(TOMMessage msg) {
requestQueue.add(msg);
}
......@@ -232,6 +234,7 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
*
* @param msg The request delivered by the TOM layer
*/
@Override
public void receiveMessage(TOMMessage msg) {
requestQueue.add(msg);
}
......@@ -239,6 +242,7 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
private ReentrantLock stateLock = new ReentrantLock();
private Condition stateCondition = stateLock.newCondition();
@Override
public byte[] getState() {
stateLock.lock();
while (!requestQueue.isEmpty()) {
......@@ -255,6 +259,7 @@ public abstract class ServiceReplica extends TOMReceiver implements Runnable {
protected abstract byte[] serializeState();
@Override
public void setState(byte[] state) {
stateLock.lock();
while (!requestQueue.isEmpty()) {
......
......@@ -18,6 +18,7 @@
package navigators.smart.tom;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
......@@ -42,6 +43,7 @@ public abstract class TOMSender implements ReplyReceiver {
private ViewManager viewManager;
//******* EDUARDO END **************//
private int session = 0; // session id
private int sequence = 0; // sequence number
private CommunicationSystemClientSide cs; // Client side comunication system
private Lock lock = new ReentrantLock(); // lock to manage concurrent access to this object by other threads
......@@ -118,6 +120,7 @@ public abstract class TOMSender implements ReplyReceiver {
this.cs.setReplyReceiver(this); // This object itself shall be a reply receiver
this.me = this.viewManager.getStaticConf().getProcessId();
this.useSignatures = this.viewManager.getStaticConf().getUseSignatures()==1?true:false;
this.session = new Random().nextInt();
}
//******* EDUARDO END **************//
......@@ -150,7 +153,7 @@ public abstract class TOMSender implements ReplyReceiver {
*/
public void TOMulticast(byte[] m) {
cs.send(useSignatures, this.viewManager.getCurrentViewProcesses(),
new TOMMessage(me, getNextSequenceNumber(), m,
new TOMMessage(me, session, getNextSequenceNumber(), m,
this.viewManager.getCurrentViewId()), false);
}
......@@ -162,7 +165,7 @@ public abstract class TOMSender implements ReplyReceiver {
*/
public void doTOMulticast(byte[] m, int reqType, boolean readOnly) {
cs.send(useSignatures, this.viewManager.getCurrentViewProcesses(),
new TOMMessage(me, getNextSequenceNumber(), m,
new TOMMessage(me, session, getNextSequenceNumber(), m,
this.viewManager.getCurrentViewId(), reqType, readOnly), false);
}
......@@ -184,7 +187,7 @@ public abstract class TOMSender implements ReplyReceiver {
* @return TOMMessage with serializedMsg and serializedMsgSignature fields filled
*/
public TOMMessage sign(byte[] m) {
TOMMessage tm = new TOMMessage(me, getNextSequenceNumber(), m,
TOMMessage tm = new TOMMessage(me, session, getNextSequenceNumber(), m,
this.viewManager.getCurrentViewId());
cs.sign(tm);
return tm;
......
......@@ -153,7 +153,8 @@ public class DeliveryThread extends Thread {
}
} else {
this.tomLayer.getCommunication().send(new int[]{requests[i].getSender()},
new TOMMessage(this.manager.getStaticConf().getProcessId(), requests[i].getSequence(),
new TOMMessage(this.manager.getStaticConf().getProcessId(),
requests[i].getSession(), requests[i].getSequence(),
TOMUtil.getBytes(this.manager.getCurrentView()), this.manager.getCurrentViewId()));
}
......@@ -177,7 +178,8 @@ public class DeliveryThread extends Thread {
for (int i = 0; i < dests.length; i++) {
//System.out.println("Entrou aqui 2");
this.tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
new TOMMessage(this.manager.getStaticConf().getProcessId(), dests[i].getSequence(),
new TOMMessage(this.manager.getStaticConf().getProcessId(),
dests[i].getSession(), dests[i].getSequence(),
response, this.manager.getCurrentViewId()));
}
......@@ -371,7 +373,8 @@ public class DeliveryThread extends Thread {
}
} else {
this.tomLayer.getCommunication().send(new int[]{requests[i].getSender()},
new TOMMessage(this.manager.getStaticConf().getProcessId(), requests[i].getSequence(),
new TOMMessage(this.manager.getStaticConf().getProcessId(),
requests[i].getSession(), requests[i].getSequence(),
TOMUtil.getBytes(this.manager.getCurrentView()), this.manager.getCurrentViewId()));
}
......@@ -393,8 +396,8 @@ public class DeliveryThread extends Thread {
for (int i = 0; i < dests.length; i++) {
//System.out.println("Entrou aqui 2");
this.tomLayer.getCommunication().send(new int[]{dests[i].getSender()},
new TOMMessage(this.manager.getStaticConf().getProcessId(), dests[i].getSequence(),
response, this.manager.getCurrentViewId()));
new TOMMessage(this.manager.getStaticConf().getProcessId(), dests[i].getSession(),
dests[i].getSequence(), response, this.manager.getCurrentViewId()));
}
......
......@@ -41,6 +41,7 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
private int reqType; // request type: application or reconfiguration request
//******* EDUARDO END **************//
private int session; // Sequence number defined by the client
private int sequence; // Sequence number defined by the client
private byte[] content = null; // Content of the message
private boolean readOnlyRequest = false; //this is a read only request
......@@ -80,8 +81,8 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
* @param sequence Sequence number defined by the client
* @param content Content of the message
*/
public TOMMessage(int sender, int sequence, byte[] content, int view) {
this(sender,sequence,content, view, ReconfigurationManager.TOM_NORMAL_REQUEST, false);
public TOMMessage(int sender, int session, int sequence, byte[] content, int view) {
this(sender,session,sequence,content, view, ReconfigurationManager.TOM_NORMAL_REQUEST, false);
}
/**
......@@ -94,8 +95,9 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
* @param readOnlyRequest it is a read only request
*/
public TOMMessage(int sender, int sequence, byte[] content, int view, int type, boolean readOnlyRequest) {
public TOMMessage(int sender, int session, int sequence, byte[] content, int view, int type, boolean readOnlyRequest) {
super(sender);
this.session = session;
this.sequence = sequence;
this.viewID = view;
buildId();
......@@ -121,12 +123,20 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
* Retrieves the debug info from the TOM layer
* @return The debug info from the TOM layer
*/
public void setSequence(DebugInfo info) {
public void setDebugInfo(DebugInfo info) {
this.info = info;
}
/****************************************************/
/**
* Retrieves the session id of this message
* @return The session id of this message
*/
public int getSession() {
return session;
}
/**
* Retrieves the sequence number defined by the client
* @return The sequence number defined by the client
......@@ -208,6 +218,7 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
super.writeExternal(out);
out.writeInt(viewID);
out.writeInt(reqType);
out.writeInt(session);
out.writeInt(sequence);
if (content == null) {
out.writeInt(-1);
......@@ -222,6 +233,7 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
super.readExternal(in);
viewID = in.readInt();
reqType = in.readInt();
session = in.readInt();
sequence = in.readInt();
int toRead = in.readInt();
if (toRead != -1) {
......@@ -239,6 +251,7 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
out.writeInt(sender);
out.writeInt(viewID);
out.writeInt(reqType);
out.writeInt(session);
out.writeInt(sequence);
if (content == null) {
......@@ -255,6 +268,7 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
sender = in.readInt();
viewID = in.readInt();
reqType = in.readInt();
session = in.readInt();
sequence = in.readInt();
int toRead = in.readInt();
......@@ -269,7 +283,6 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
buildId();
}
/**
* Used to build an unique id for the message
*/
......@@ -312,6 +325,7 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
return m;
}
@Override
public int compareTo(Object o) {
final int BEFORE = -1;
final int EQUAL = 0;
......@@ -327,6 +341,11 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
if (this.getSender() > tm.getSender())
return AFTER;
if (this.getSession() < tm.getSession())
return BEFORE;
if (this.getSession() > tm.getSession())
return AFTER;
if (this.getSequence() < tm.getSequence())
return BEFORE;
if (this.getSequence() > tm.getSequence())
......@@ -335,4 +354,3 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
return EQUAL;
}
}
......@@ -18,6 +18,7 @@
package navigators.smart.tom.demo;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -32,10 +33,12 @@ public class LatencyTestServer extends TOMReceiver {
private ServerCommunicationSystem cs;
private int id;
private int session;
/** Creates a new instance of TOMServerPerformanceTest */
public LatencyTestServer(int id) {
this.id = id;
this.session = new Random().nextInt();
}
public void run(){
......@@ -57,7 +60,7 @@ public class LatencyTestServer extends TOMReceiver {
}
public void receiveOrderedMessage(TOMMessage msg){
TOMMessage reply = new TOMMessage(id,msg.getSequence(),
TOMMessage reply = new TOMMessage(id,session,msg.getSequence(),
msg.getContent(),msg.getViewID());
// //Logger.println("request received: "+msg.getSender()+
......@@ -66,8 +69,9 @@ public class LatencyTestServer extends TOMReceiver {
cs.send(new int[]{msg.getSender()},reply);
}
@Override
public void receiveMessage(TOMMessage msg) {
TOMMessage reply = new TOMMessage(id,msg.getSequence(),
TOMMessage reply = new TOMMessage(id,session,msg.getSequence(),
msg.getContent(),msg.getViewID());
// //Logger.println("request received: "+msg.getSender()+
......@@ -76,6 +80,7 @@ public class LatencyTestServer extends TOMReceiver {
cs.send(new int[]{msg.getSender()},reply);
}
@Override
public byte[] getState() {
return new byte[1];
}
......
......@@ -18,6 +18,7 @@
package navigators.smart.tom.demo;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -32,10 +33,12 @@ public class LatencyTestServerReplica0 extends TOMReceiver {
private ServerCommunicationSystem cs;
private int id;
private int session;
/** Creates a new instance of TOMServerPerformanceTest */
public LatencyTestServerReplica0(int id) {
this.id = id;
this.session = new Random().nextInt();
}
public void run(){
......@@ -56,8 +59,9 @@ public class LatencyTestServerReplica0 extends TOMReceiver {
/******************************************************/
}
@Override
public void receiveOrderedMessage(TOMMessage msg){
TOMMessage reply = new TOMMessage(id,msg.getSequence(),
TOMMessage reply = new TOMMessage(id,session,msg.getSequence(),
msg.getContent(),msg.getViewID());
// //Logger.println("request received: "+msg.getSender()+
......@@ -78,14 +82,17 @@ public class LatencyTestServerReplica0 extends TOMReceiver {
new LatencyTestServerReplica0(0).run();
}
@Override
public byte[] getState() {
return new byte[1];
}
@Override
public void setState(byte[] state) {
}
@Override
public void receiveMessage(TOMMessage msg) {
throw new UnsupportedOperationException("Not supported yet.");
}
......
......@@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -38,6 +39,7 @@ public class ThroughputLatencyTestServer extends TOMReceiver {
private ServerCommunicationSystem cs;
private int id;
private int session;
private int interval;
private long numDecides=0;
private long lastDecideTimeInstant;
......@@ -54,6 +56,7 @@ public class ThroughputLatencyTestServer extends TOMReceiver {
public ThroughputLatencyTestServer(int id, int interval, int averageIterations) {
this.id = id;
this.session = new Random().nextInt();
this.interval = interval;
this.totalOps = 0;
this.averageIterations = averageIterations;
......@@ -86,6 +89,7 @@ public class ThroughputLatencyTestServer extends TOMReceiver {
/******************************************************/
}
@Override
public void receiveOrderedMessage(TOMMessage msg){
long receiveInstant = System.currentTimeMillis();
......@@ -115,14 +119,14 @@ public class ThroughputLatencyTestServer extends TOMReceiver {
Logger.getLogger(ThroughputLatencyTestServer.class.getName()).log(Level.SEVERE, null, ex);
}
System.arraycopy(out.toByteArray(), 0, command, 0, 12);
TOMMessage reply = new TOMMessage(id,msg.getSequence(),
TOMMessage reply = new TOMMessage(id,session,msg.getSequence(),
command,msg.getViewID());
cs.send(new int[]{msg.getSender()},reply);
}
else {
//echo msg to client
//System.out.println("Echoing msg to client");
TOMMessage reply = new TOMMessage(id,msg.getSequence(),
TOMMessage reply = new TOMMessage(id,session,msg.getSequence(),
msg.getContent(),msg.getViewID());
cs.send(new int[]{msg.getSender()},reply);
}
......@@ -182,14 +186,17 @@ public class ThroughputLatencyTestServer extends TOMReceiver {
new ThroughputLatencyTestServer(Integer.parseInt(args[0]),Integer.parseInt(args[1]),Integer.parseInt(args[2])).run();
}
@Override
public byte[] getState() {
return new byte[1];
}
@Override
public void setState(byte[] state) {
}
@Override
public void receiveMessage(TOMMessage msg) {
throw new UnsupportedOperationException("Not supported yet.");
}
......
......@@ -68,7 +68,7 @@ public class ThroughputTestClient extends TOMSender implements Runnable {
//LinkedList<TOMMessage> generatedMessages = null;
try {
System.out.println("(" + currentId + ") A dormir 10 segundos ah espera dos outros clientes");
System.out.println("(" + currentId + ") Let's sleep 10 seconds to wait for (possible) other clients");
Thread.sleep(10000);
/*
if (conf.getUseSignatures()==1){
......@@ -92,7 +92,6 @@ public class ThroughputTestClient extends TOMSender implements Runnable {
System.out.println("Sending requests ...");
long startTimeInstant = System.currentTimeMillis();
for (int i = 0; i < exec; i++) {
// if (conf.getUseSignatures()==0){
int currId = currentId;
byte[] command = new byte[4 + argSize];
ByteArrayOutputStream out = new ByteArrayOutputStream(4);
......@@ -103,10 +102,6 @@ public class ThroughputTestClient extends TOMSender implements Runnable {
}
System.arraycopy(out.toByteArray(), 0, command, 0, 4);
this.TOMulticast(command);
// }
// else {
// this.doTOMulticast(generatedMessages.get(i));
// }
if ((i!=0) && ((i % 1000) == 0)) {
long elapsedTime = System.currentTimeMillis() - startTimeInstant;
double opsPerSec_ = ((double)1000)/(((double)elapsedTime/1000));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册