提交 3427462a 编写于 作者: J João Sousa

...

上级 523fead3
无法预览此类型文件
......@@ -31,6 +31,8 @@ import bftsmart.tom.leaderchange.RequestsTimer;
import bftsmart.tom.server.RequestVerifier;
import bftsmart.tom.util.TOMUtil;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -52,6 +54,9 @@ public class ClientsManager {
private ServerCommunicationSystem cs;
private ReentrantLock clientsLock = new ReentrantLock();
private LinkedBlockingQueue<TOMMessage> ackQueue;
private Thread ackReplier;
public ClientsManager(ServerViewController controller, RequestsTimer timer, RequestVerifier verifier,
ServerCommunicationSystem cs, ExecutionManager manager, DeliveryThread dt) {
......@@ -62,6 +67,25 @@ public class ClientsManager {
this.cs = cs;
this.manager = manager;
this.dt = dt;
ackQueue = new LinkedBlockingQueue<>();
ackReplier = new Thread() {
public void run() {
while (true) {
try {
TOMMessage request = ackQueue.take();
cs.send(new int[]{request.getSender()}, request.reply);
} catch (InterruptedException ex) {
java.util.logging.Logger.getLogger(ClientsManager.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
};
ackReplier.start();
}
/**
......@@ -447,24 +471,12 @@ public class ClientsManager {
TOMMessage ack = new TOMMessage(controller.getStaticConf().getProcessId(), request.getSession(), request.getSequence(),
request.getOperationId(), buff.array(), request.getViewID(), TOMMessageType.ACK);
//Launch another thread to not interfere with the netty worker
Thread t = new Thread() {
public void run() {
MessageContext msgCtx = new MessageContext(request.getSender(), request.getViewID(), request.getReqType(),
request.getSession(), request.getSequence(), request.getOperationId(), request.getReplyServer(), request.serializedMessageSignature,
System.currentTimeMillis(), 0, 0, -1, -1, -1, null, null, false);
request.reply = ack;
request.msgCtx = msgCtx;
//cs.send(new int[]{request.getSender()}, ack);
dt.getReplyManager().send(request);
}
};
t.start();
request.reply = ack;
try {
ackQueue.put(request);
} catch (InterruptedException ex) {
java.util.logging.Logger.getLogger(ClientsManager.class.getName()).log(Level.SEVERE, null, ex);
}
request.ackSent = true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册