提交 ca45a735 编写于 作者: L liquidsnake@sapo.pt

Small but important change in the communication system: when a connection to a...

Small but important change in the communication system: when a connection to a replica is lost, the OutQueue is erased. This is meant to avoid gaps in the sequence of messages upon reconnection, and thus, avoid the "break" of upper protocols.

Also, small refractor in TomUtil.
上级 93fb50f6
......@@ -60,12 +60,12 @@ public class MessageHandler {
case TOMUtil.STOP:
type = "STOP";
break;
case TOMUtil.STOPDATA:
type = "STOPDATA";
break;
case TOMUtil.SYNC:
type = "SYNC";
break;
case TOMUtil.CATCH_UP:
type = "CATCH_UP";
break;
}
......
......@@ -171,7 +171,9 @@ public class ServerConnection {
*/
private final void sendBytes(byte[] messageData) {
int i = 0;
boolean abort = false;
do {
if (abort) return; // if there is a need to reconnect, abort this method
if (socket != null && socketOutStream != null) {
try {
//do an extra copy of the data to be sent, but on a single out stream write
......@@ -191,9 +193,11 @@ public class ServerConnection {
} catch (IOException ex) {
closeSocket();
waitAndConnect();
abort = true;
}
} else {
waitAndConnect();
abort = true;
}
//br.ufsc.das.tom.util.Logger.println("(ServerConnection.sendBytes) iteration " + i);
i++;
......@@ -333,6 +337,7 @@ public class ServerConnection {
} catch (InterruptedException ie) {
}
outQueue.clear();
reconnect(null);
}
}
......
......@@ -693,7 +693,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
// enviar mensagem SYNC para o novo lider
communication.send(b,
new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.SYNC, regency, payload));
new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.STOPDATA, regency, payload));
//TODO: Voltar a ligar o timeout
......@@ -812,7 +812,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
}
}
break;
case TOMUtil.SYNC: // mensagens SYNC
case TOMUtil.STOPDATA: // mensagens STOPDATA
{
int regency = msg.getReg();
......@@ -893,7 +893,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
}
}
break;
case TOMUtil.CATCH_UP: // mensagens de CATCH-UP
case TOMUtil.SYNC: // mensagens SYNC
{
int regency = msg.getReg();
......@@ -997,7 +997,7 @@ public final class TOMLayer extends Thread implements RequestReceiver {
// enviar a mensagem CATCH-UP
communication.send(this.reconfManager.getCurrentViewOtherAcceptors(),
new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.CATCH_UP, regency, payload));
new LCMessage(this.reconfManager.getStaticConf().getProcessId(), TOMUtil.SYNC, regency, payload));
finalise(regency, lastHighestEid, currentEid, signedCollects, propose, batchSize, true);
......
......@@ -39,8 +39,8 @@ public class TOMUtil {
public static final int RR_REPLY = 1;
public static final int RR_DELIVERED = 2;
public static final int STOP = 3;
public static final int SYNC = 4;
public static final int CATCH_UP = 5;
public static final int STOPDATA = 4;
public static final int SYNC = 5;
public static final int SM_REQUEST = 6;
public static final int SM_REPLY = 7;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册