提交 e59ca445 编写于 作者: J João Sousa

Fixed a mistake when evaluating the time elapsed since a request was received...

Fixed a mistake when evaluating the time elapsed since a request was received for the leader change protocol (time units were in nanoseconds but evaluated as miliseconds).
Fixed a bug on the leader change protocol that happened if the leader crashed and the request timeout triggered without any requests expired, which would make  the system block.
Edited that old LC_MSG log message into a more developer-friendly format. It also now only output if the message comes from other replica.
上级 265a6c67
无法预览此类型文件
......@@ -223,6 +223,7 @@ public class ClientsManager {
if (controller.getStaticConf().isBFT() && !verifier.isValidRequest(request)) return false;
request.receptionTime = System.nanoTime();
request.receptionTimestamp = System.currentTimeMillis();
int clientId = request.getSender();
boolean accounted = false;
......
......@@ -15,7 +15,6 @@ limitations under the License.
*/
package bftsmart.communication;
import bftsmart.communication.server.ServerConnection;
import bftsmart.consensus.messages.MessageFactory;
import bftsmart.consensus.messages.ConsensusMessage;
import bftsmart.consensus.roles.Acceptor;
......@@ -47,7 +46,6 @@ public class MessageHandler {
private Acceptor acceptor;
private TOMLayer tomLayer;
//private Cipher cipher;
private Mac mac;
public MessageHandler() {
......@@ -140,7 +138,9 @@ public class MessageHandler {
break;
}
logger.info("LC_MSG received: type " + type + ", regency " + lcMsg.getReg() + ", (replica " + lcMsg.getSender() + ")");
if (lcMsg.getReg() != -1 && lcMsg.getSender() != -1)
logger.info("Received leader change message of type {} for regency {} from replica {}", type, lcMsg.getReg(), lcMsg.getSender());
else logger.debug("Received leader change message from myself");
if (lcMsg.TRIGGER_LC_LOCALLY) tomLayer.requestsTimer.run_lc_protocol();
else tomLayer.getSynchronizer().deliverTimeoutRequest(lcMsg);
/**************************************************************/
......
......@@ -57,8 +57,10 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
public transient int destination = -1; // message destination
public transient boolean signed = false; // is this message signed?
public transient long receptionTime;//the reception time of this message
public transient boolean timeout = false;//this message was timed out?
public transient long receptionTime;//the reception time of this message (nanoseconds)
public transient long receptionTimestamp;//the reception timestamp of this message (miliseconds)
public transient boolean timeout = false;//this message was timed out?
public transient boolean recvFromClient = false; // Did the client already sent this message to me, or did it arrived in the batch?
......@@ -219,7 +221,7 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
@Override
public String toString() {
return "(" + sender + "," + sequence + "," + operationId + "," + session + ")";
return "[" + sender + ":" + session + ":" + sequence + "]";
}
public void wExternal(DataOutput out) throws IOException {
......
......@@ -77,14 +77,6 @@ public class RequestsTimer {
this.shortTimeout = shortTimeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public long getTimeout() {
return timeout;
}
public void startTimer() {
if (rtTask == null) {
long t = (shortTimeout > -1 ? shortTimeout : timeout);
......@@ -157,25 +149,33 @@ public class RequestsTimer {
//System.out.println("(RequestTimerTask.run) I SOULD NEVER RUN WHEN THERE IS NO TIMEOUT");
LinkedList<TOMMessage> pendingRequests = new LinkedList<TOMMessage>();
LinkedList<TOMMessage> pendingRequests = new LinkedList<>();
rwLock.readLock().lock();
try {
for (Iterator<TOMMessage> i = watched.iterator(); i.hasNext();) {
TOMMessage request = i.next();
if ((request.receptionTime + System.currentTimeMillis()) > t) {
pendingRequests.add(request);
} else {
break;
rwLock.readLock().lock();
for (Iterator<TOMMessage> i = watched.iterator(); i.hasNext();) {
TOMMessage request = i.next();
if ((System.currentTimeMillis() - request.receptionTimestamp ) > t) {
pendingRequests.add(request);
}
}
} finally {
rwLock.readLock().unlock();
}
rwLock.readLock().unlock();
if (!pendingRequests.isEmpty()) {
logger.info("The following requests timed out: " + pendingRequests);
for (ListIterator<TOMMessage> li = pendingRequests.listIterator(); li.hasNext(); ) {
TOMMessage request = li.next();
if (!request.timeout) {
logger.info("Forwarding requests {} to leader", request);
request.signed = request.serializedMessageSignature != null;
tomLayer.forwardRequestToLeader(request);
......@@ -185,7 +185,7 @@ public class RequestsTimer {
}
if (!pendingRequests.isEmpty()) {
logger.info("Timeout for messages: " + pendingRequests);
logger.info("Attempting to start leader change for requests {}", pendingRequests);
//Logger.debug = true;
//tomLayer.requestTimeout(pendingRequests);
//if (reconfManager.getStaticConf().getProcessId() == 4) Logger.debug = true;
......@@ -196,8 +196,11 @@ public class RequestsTimer {
timer.schedule(rtTask, t);
}
} else {
rtTask = null;
timer.purge();
logger.debug("Timeout triggered with no expired requests");
rtTask = new RequestTimerTask();
timer.schedule(rtTask, t);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册