diff --git a/bin/BFT-SMaRt.jar b/bin/BFT-SMaRt.jar index 5f7323344cd80e228d5137d8762aa5b6e1bfac04..a9f3cab4b0eaedada2aebe86a39d02cd5bc23841 100644 Binary files a/bin/BFT-SMaRt.jar and b/bin/BFT-SMaRt.jar differ diff --git a/src/bftsmart/clientsmanagement/ClientsManager.java b/src/bftsmart/clientsmanagement/ClientsManager.java index 3fc85f3ef6f426c5c37fc0f39d3112bc7a7ac27f..8362afb33c05d0cbe91854e99bce2883f9ece444 100644 --- a/src/bftsmart/clientsmanagement/ClientsManager.java +++ b/src/bftsmart/clientsmanagement/ClientsManager.java @@ -31,6 +31,8 @@ import bftsmart.tom.leaderchange.RequestsTimer; import bftsmart.tom.server.RequestVerifier; import bftsmart.tom.util.TOMUtil; import java.nio.ByteBuffer; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,9 @@ public class ClientsManager { private ServerCommunicationSystem cs; private ReentrantLock clientsLock = new ReentrantLock(); + + private LinkedBlockingQueue ackQueue; + private Thread ackReplier; public ClientsManager(ServerViewController controller, RequestsTimer timer, RequestVerifier verifier, ServerCommunicationSystem cs, ExecutionManager manager, DeliveryThread dt) { @@ -62,6 +67,25 @@ public class ClientsManager { this.cs = cs; this.manager = manager; this.dt = dt; + + ackQueue = new LinkedBlockingQueue<>(); + ackReplier = new Thread() { + + public void run() { + + while (true) { + + try { + TOMMessage request = ackQueue.take(); + cs.send(new int[]{request.getSender()}, request.reply); + } catch (InterruptedException ex) { + java.util.logging.Logger.getLogger(ClientsManager.class.getName()).log(Level.SEVERE, null, ex); + } + } + } + }; + + ackReplier.start(); } /** @@ -447,24 +471,12 @@ public class ClientsManager { TOMMessage ack = new TOMMessage(controller.getStaticConf().getProcessId(), request.getSession(), request.getSequence(), request.getOperationId(), buff.array(), request.getViewID(), TOMMessageType.ACK); - //Launch another thread to not interfere with the netty worker - Thread t = new Thread() { - - public void run() { - - MessageContext msgCtx = new MessageContext(request.getSender(), request.getViewID(), request.getReqType(), - request.getSession(), request.getSequence(), request.getOperationId(), request.getReplyServer(), request.serializedMessageSignature, - System.currentTimeMillis(), 0, 0, -1, -1, -1, null, null, false); - - request.reply = ack; - request.msgCtx = msgCtx; - - //cs.send(new int[]{request.getSender()}, ack); - dt.getReplyManager().send(request); - } - }; - - t.start(); + request.reply = ack; + try { + ackQueue.put(request); + } catch (InterruptedException ex) { + java.util.logging.Logger.getLogger(ClientsManager.class.getName()).log(Level.SEVERE, null, ex); + } request.ackSent = true; }