提交 27d1ca6b 编写于 作者: J João Sousa

Transmission of replis is now always done by the ReplyManager, but using only a single thead

Server-side netty channels and now synchronous
Refined control flow mechanism to also oversee the number of pending replie waiting to arrive at clients.
上级 a2676efc
无法预览此类型文件
......@@ -53,6 +53,12 @@ system.communication.defaultkeys = true
#loopback address for the host machine, this parameter is overriden by that
system.communication.bindaddress = auto
#Timeout used at client-side in case a malicous replica refuses to acknowledge that a write to a netty channel has complete, in miliseconds
system.communication.clienttimeout = 1000
#Timeout used at client-side in case a malicous replica refuses to acknowledge that a write to a netty channel has complete, in miliseconds
system.communication.replicatimeout = 1000
############################################
### Replication Algorithm Configurations ###
############################################
......@@ -104,10 +110,6 @@ system.samebatchsize = false
#Set to 'macvector' to use MAC vectors in the consensus proof, 'signatures' to use digital signatures
system.totalordermulticast.prooftype = signatures
#Size of the thread pool that transmits replies to clients. If set to 0, no thread pool is used and this
#done sequentially by the delivery thread instead.
system.numrepliers = 10
#Timeout associated to each request invoked by a client, in miliseconds
system.totalordermulticast.invoketimeout = 40000
......@@ -132,6 +134,13 @@ system.controlflow.maxpendingdecs = 200
#it accepts requests again from clients. It must be lower than system.totalordermulticast.maxpendingdecs
system.controlflow.preferredpendingdecs = 100
#Maximum number of total pending replies that each replica can hold. Set to -1 to disable.
system.controlflow.maxpendingreps = 2000
#Preferred number of total pending replies. Once each replica hold up to this many replies,
#it accepts requests again from clients. It must be lower than system.totalordermulticast.maxpendingreps
system.controlflow.preferredpendingreps = 1000
#Maximum used memory that each replica should reach, in bytes. Set to -1 to disable.
system.controlflow.maxusedmemory = 524288000
......
......@@ -272,6 +272,7 @@ public class ClientsManager {
int pendingReqs = countPendingRequests();
int pendingDecs = dt.getPendingDecisions();
int pendingReps = dt.getPendingReplies();
long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
//control flow mechanism
......@@ -279,6 +280,7 @@ public class ClientsManager {
if ((this.controller.getStaticConf().getMaxPendigReqs() > 0 && pendingReqs >= this.controller.getStaticConf().getMaxPendigReqs()) ||
(this.controller.getStaticConf().getMaxPendigDecs() > 0 && pendingDecs >= this.controller.getStaticConf().getMaxPendigDecs()) ||
(this.controller.getStaticConf().getMaxPendigReps() > 0 && pendingReps >= this.controller.getStaticConf().getMaxPendigReps()) ||
(this.controller.getStaticConf().getMaxUsedMemory() > 0 && usedMemory >= this.controller.getStaticConf().getMaxUsedMemory()))
{
......@@ -286,6 +288,7 @@ public class ClientsManager {
} else if (pendingReqs <= this.controller.getStaticConf().getPreferredPendigReqs() &&
pendingDecs <= this.controller.getStaticConf().getPreferredPendigDecs() &&
pendingReps <= this.controller.getStaticConf().getPreferredPendigReps() &&
usedMemory <= this.controller.getStaticConf().getPreferredUsedMemory())
{
......@@ -298,11 +301,13 @@ public class ClientsManager {
usedMemory > this.controller.getStaticConf().getPreferredUsedMemory()) Runtime.getRuntime().gc(); // force garbage collection
logger.warn("Discarding message due to control flow mechanism\n" +
"\tMaximum requests are {}, current requests at {}\n" +
"\tMaximum decisions are {}, current decisions at {}\n" +
"\tMaximum requests are {}, pending requests at {}\n" +
"\tMaximum decisions are {}, pending decisions at {}\n" +
"\tMaximum replies are {}, pending replies at {}\n" +
"\tMaximum memory is {} current memory at {}\n",
this.controller.getStaticConf().getMaxPendigReqs(), pendingReqs,
this.controller.getStaticConf().getMaxPendigDecs(), pendingDecs,
this.controller.getStaticConf().getMaxPendigReps(), pendingReps,
TOMUtil.humanReadableByteCount(this.controller.getStaticConf().getMaxUsedMemory(), false),
TOMUtil.humanReadableByteCount(usedMemory, false));
......
......@@ -98,7 +98,7 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
SecretKeyFactory fac = TOMUtil.getSecretFactory();
this.controller = controller;
this.listener = new SyncListener();
this.listener = new SyncListener(this.controller.getStaticConf().getNettyClientTimeout());
//this.st = new Storage(BENCHMARK_PERIOD);
this.rl = new ReentrantReadWriteLock();
......@@ -513,61 +513,4 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
},time,TimeUnit.SECONDS);
}
private class SyncListener implements GenericFutureListener<ChannelFuture> {
private int remainingFutures;
private final Lock futureLock;
private final Condition enoughCompleted;
public SyncListener() {
this.remainingFutures = 0;
this.futureLock = new ReentrantLock();
this.enoughCompleted = futureLock.newCondition();
}
@Override
public void operationComplete(ChannelFuture f) {
this.futureLock.lock();
this.remainingFutures--;
if (this.remainingFutures <= 0) {
this.enoughCompleted.signalAll();
}
logger.debug(this.remainingFutures + " channel operations remaining to complete");
this.futureLock.unlock();
}
public void waitForChannels(int n) {
this.futureLock.lock();
if (this.remainingFutures > 0) {
logger.debug("There are still " + this.remainingFutures + " channel operations pending, waiting to complete");
try {
this.enoughCompleted.await(1000, TimeUnit.MILLISECONDS); // timeout if a malicous replica refuses to acknowledge the operation as completed
} catch (InterruptedException ex) {
logger.error("Interruption while waiting on condition", ex);
}
}
logger.debug("All channel operations completed or timed out");
this.remainingFutures = n;
this.futureLock.unlock();
}
}
}
......@@ -31,7 +31,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.security.NoSuchAlgorithmException;
......@@ -54,7 +53,6 @@ import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.util.TOMUtil;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.logging.Level;
/**
*
......@@ -71,6 +69,8 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
private ServerViewController controller;
private boolean closed = false;
private Channel mainChannel;
private SyncListener listener;
private int sending = 0;
// This locked seems to introduce a bottleneck and seems useless, but I cannot recall why I added it
//private ReentrantLock sendLock = new ReentrantLock();
......@@ -80,6 +80,7 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
try {
this.controller = controller;
this.listener = new SyncListener(this.controller.getStaticConf().getNettyReplicaTimeout());
sessionTable = new HashMap();
rl = new ReentrantReadWriteLock();
......@@ -264,7 +265,9 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
@Override
public void send(int[] targets, TOMMessage sm, boolean serializeClassHeaders) {
sending = 0;
//serialize message
DataOutputStream dos = null;
......@@ -340,6 +343,8 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
msg.destination = id;
//send message
session.writeAndFlush(msg);
sending++;
}
rl.readLock().unlock();
......@@ -359,10 +364,12 @@ public class NettyClientServerCommunicationSystemServerSide extends SimpleChanne
logger.warn("!!!!!!!!NettyClientServerSession is NULL !!!!!! sequence: " + sm.getSequence() + ", ID; " + targets[i]);
}
} finally {
//sendLock.unlock();
rl.readLock().unlock();
}
}
listener.waitForChannels(sending); // wait for the this transmission to complete
}
@Override
......
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package bftsmart.communication.client.netty;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author joao
*/
class SyncListener implements GenericFutureListener<ChannelFuture> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private int timeout;
private int remainingFutures;
private final Lock futureLock;
private final Condition enoughCompleted;
public SyncListener(int timeout) {
this.timeout = timeout;
this.remainingFutures = 0;
this.futureLock = new ReentrantLock();
this.enoughCompleted = futureLock.newCondition();
}
@Override
public void operationComplete(ChannelFuture f) {
this.futureLock.lock();
this.remainingFutures--;
if (this.remainingFutures <= 0) {
this.enoughCompleted.signalAll();
}
logger.debug(this.remainingFutures + " channel operations remaining to complete");
this.futureLock.unlock();
}
public void waitForChannels(int n) {
this.futureLock.lock();
if (this.remainingFutures > 0) {
logger.debug("There are still " + this.remainingFutures + " channel operations pending, waiting to complete");
try {
this.enoughCompleted.await(this.timeout, TimeUnit.MILLISECONDS); // timeout if a malicous process refuses to acknowledge the operation as completed
} catch (InterruptedException ex) {
logger.error("Interruption while waiting on condition", ex);
}
}
logger.debug("All channel operations completed or timed out");
this.remainingFutures = n;
this.futureLock.unlock();
}
}
......@@ -33,6 +33,8 @@ public class TOMConfiguration extends Configuration {
protected int batchTimeout;
protected int controlFlowTimeout;
protected int invokeTimeout;
protected int nettyClientTimeout;
protected int nettyReplicaTimeout;
protected int tomPeriod;
protected int paxosHighMark;
protected int revivalHighMark;
......@@ -43,6 +45,8 @@ public class TOMConfiguration extends Configuration {
protected int preferredPendingReqs;
protected int maxPendingDecs;
protected int preferredPendingDecs;
protected int maxPendingReps;
protected int preferredPendingReps;
protected int maxUsedMemory;
protected int preferredUsedMemory;
protected int numberOfNonces;
......@@ -57,7 +61,6 @@ public class TOMConfiguration extends Configuration {
private boolean stateTransferEnabled;
private int checkpointPeriod;
private int globalCheckpointPeriod;
private int useControlFlow;
private int[] initialView;
private int ttpId;
private boolean isToLog;
......@@ -67,7 +70,6 @@ public class TOMConfiguration extends Configuration {
private boolean isToWriteCkpsToDisk;
private boolean syncCkp;
private boolean isBFT;
private int numRepliers;
private int numNettyWorkers;
private boolean sameBatchSize;
private String bindAddress;
......@@ -135,6 +137,26 @@ public class TOMConfiguration extends Configuration {
}
}
s = (String) configs.remove("system.communication.clienttimeout");
if (s == null) {
nettyClientTimeout = 1000;
} else {
nettyClientTimeout = Integer.parseInt(s);
if (nettyClientTimeout <= 0) {
nettyClientTimeout = 1000;
}
}
s = (String) configs.remove("system.communication.replicaimeout");
if (s == null) {
nettyReplicaTimeout = 1000;
} else {
nettyReplicaTimeout = Integer.parseInt(s);
if (nettyReplicaTimeout <= 0) {
nettyReplicaTimeout = 1000;
}
}
s = (String) configs.remove("system.totalordermulticast.highMark");
if (s == null) {
paxosHighMark = 10000;
......@@ -215,6 +237,24 @@ public class TOMConfiguration extends Configuration {
}
s = (String) configs.remove("system.controlflow.maxpendingreps");
if (s == null) {
maxPendingReps = 100000;
} else {
maxPendingReps = Integer.parseInt(s);
if (maxPendingReps <= 0) {
maxPendingReps = -1;
}
}
s = (String) configs.remove("system.controlflow.preferredpendingreps");
if (s == null) {
preferredPendingReps = 10000;
} else {
preferredPendingReps = Integer.parseInt(s);
}
s = (String) configs.remove("system.controlflow.maxusedmemory");
if (s == null) {
maxUsedMemory = 100000;
......@@ -299,13 +339,6 @@ public class TOMConfiguration extends Configuration {
checkpointPeriod = Integer.parseInt(s);
}
s = (String) configs.remove("system.communication.useControlFlow");
if (s == null) {
useControlFlow = 0;
} else {
useControlFlow = Integer.parseInt(s);
}
s = (String) configs.remove("system.initial.view");
if (s == null) {
initialView = new int[n];
......@@ -405,13 +438,6 @@ public class TOMConfiguration extends Configuration {
s = (String) configs.remove("system.bft");
isBFT = (s != null) ? Boolean.parseBoolean(s) : true;
s = (String) configs.remove("system.numrepliers");
if (s == null) {
numRepliers = 0;
} else {
numRepliers = Integer.parseInt(s);
}
s = (String) configs.remove("system.numnettyworkers");
if (s == null) {
......@@ -477,6 +503,14 @@ public class TOMConfiguration extends Configuration {
return invokeTimeout;
}
public int getNettyClientTimeout() {
return nettyClientTimeout;
}
public int getNettyReplicaTimeout() {
return nettyReplicaTimeout;
}
public int getReplyVerificationTime() {
return replyVerificationTime;
}
......@@ -521,6 +555,14 @@ public class TOMConfiguration extends Configuration {
return preferredPendingDecs;
}
public int getMaxPendigReps() {
return maxPendingReps;
}
public int getPreferredPendigReps() {
return preferredPendingReps;
}
public int getMaxUsedMemory() {
return maxUsedMemory;
}
......@@ -622,21 +664,10 @@ public class TOMConfiguration extends Configuration {
return globalCheckpointPeriod;
}
/**
* Indicates if a simple control flow mechanism should be used to avoid an overflow of client requests
*/
public int getUseControlFlow() {
return useControlFlow;
}
public boolean isBFT(){
return this.isBFT;
}
public int getNumRepliers() {
return numRepliers;
}
public int getNumNettyWorkers() {
return numNettyWorkers;
......
......@@ -487,7 +487,7 @@ public class AsynchServiceProxy extends ServiceProxy {
ackSeq++;
sm.setAckSeq(ackSeq);
logger.warn("Retrying invoke at client {} for request #{} ACK ack sequence #{}",
logger.warn("Retrying invoke at client {} for request #{} with ACK sequence #{}",
getViewManager().getStaticConf().getProcessId(), reqCtx.getOperationId(), sm.getAckSeq());
Arrays.fill(acks, null);
......
......@@ -146,6 +146,7 @@ public class ServiceReplica {
private void init() {
try {
cs = new ServerCommunicationSystem(this.SVController, this);
repMan = new ReplyManager(cs, replier);
} catch (Exception ex) {
logger.error("Failed to initialize replica-to-replica communication system", ex);
throw new RuntimeException("Unable to build a communication system.");
......@@ -191,7 +192,7 @@ public class ServiceReplica {
private void initReplica() {
cs.start();
repMan = new ReplyManager(SVController.getStaticConf().getNumRepliers(), cs);
repMan.start();
}
/**
......@@ -209,11 +210,9 @@ public class ServiceReplica {
message.getReplyServer() != this.id),message.getContent(), msgCtx);
if (response != null) {
if (SVController.getStaticConf().getNumRepliers() > 0) {
repMan.send(response);
} else {
cs.send(new int[]{response.getSender()}, response.reply);
}
repMan.send(response);
}
}
......@@ -339,7 +338,7 @@ public class ServiceReplica {
if (response != null) {
logger.debug("sending reply to " + response.getSender());
replier.manageReply(response, msgCtx);
repMan.send(response);
}
} else { //this code should never be executed
throw new UnsupportedOperationException("Non-existent interface");
......@@ -428,15 +427,9 @@ public class ServiceReplica {
if (replies != null) {
for (TOMMessage reply : replies) {
if (SVController.getStaticConf().getNumRepliers() > 0) {
logger.debug("Sending reply to " + reply.getSender() + " with sequence number " + reply.getSequence() + " and operation ID " + reply.getOperationId() +" via ReplyManager");
repMan.send(reply);
} else {
logger.debug("Sending reply to " + reply.getSender() + " with sequence number " + reply.getSequence() + " and operation ID " + reply.getOperationId());
replier.manageReply(reply, null);
//cs.send(new int[]{request.getSender()}, request.reply);
}
logger.debug("Sending reply to " + reply.getSender() + " with sequence number " + reply.getSequence() + " and operation ID " + reply.getOperationId());
repMan.send(reply);
}
}
//DEBUG
......@@ -520,4 +513,9 @@ public class ServiceReplica {
public int getId() {
return id;
}
public int getPendingReplies() {
return repMan.getPendingReplies();
}
}
......@@ -81,6 +81,10 @@ public final class DeliveryThread extends Thread {
return decided.size() + currentDecisions;
}
public int getPendingReplies() {
return receiver.getPendingReplies();
}
/**
* Invoked by the TOM layer, to deliver a decision
* @param dec Decision established from the consensus
......
......@@ -6,6 +6,7 @@ package bftsmart.tom.core;
import bftsmart.communication.ServerCommunicationSystem;
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.server.Replier;
import io.netty.channel.Channel;
import java.util.HashMap;
import java.util.LinkedList;
......@@ -15,8 +16,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.slf4j.LoggerFactory;
......@@ -24,48 +23,27 @@ import org.slf4j.LoggerFactory;
*
* @author joao
*/
public class ReplyManager {
public class ReplyManager extends Thread {
private LinkedList<ReplyThread> threads;
private int iteration;
public ReplyManager(int numThreads, ServerCommunicationSystem cs) {
this.threads = new LinkedList();
this.iteration = 0;
for (int i = 0; i < numThreads; i++) {
this.threads.add(new ReplyThread(cs));
}
for (ReplyThread t : threads)
t.start();
}
public void send (TOMMessage msg) {
iteration++;
threads.get((iteration % threads.size())).send(msg);
}
}
class ReplyThread extends Thread {
private LinkedBlockingQueue<TOMMessage> replies;
private int currentReplies = 0;
private boolean doWork = true;
private LinkedBlockingQueue<TOMMessage> replies = null;
private ServerCommunicationSystem cs = null;
private Replier replier = null;
private final Lock queueLock = new ReentrantLock();
private final Condition notEmptyQueue = queueLock.newCondition();
private Map<Integer, Channel> channels;
ReplyThread(ServerCommunicationSystem cs) {
public ReplyManager (ServerCommunicationSystem cs, Replier replier) {
this.cs = cs;
this.replies = new LinkedBlockingQueue<TOMMessage>();
this.channels = new HashMap<>();
this.replier = replier;
}
void send(TOMMessage msg) {
public void send(TOMMessage msg) {
try {
queueLock.lock();
......@@ -73,27 +51,44 @@ class ReplyThread extends Thread {
notEmptyQueue.signalAll();
queueLock.unlock();
} catch (InterruptedException ex) {
Logger.getLogger(ReplyThread.class.getName()).log(Level.SEVERE, null, ex);
LoggerFactory.getLogger(this.getClass()).error("Interruption error", ex);
}
}
public int getPendingReplies() {
return replies.size() + currentReplies;
}
public void run() {
while (true) {
while (doWork) {
try {
currentReplies = 0;
LinkedList<TOMMessage> list = new LinkedList<>();
queueLock.lock();
while (replies.isEmpty()) notEmptyQueue.await(10, TimeUnit.MILLISECONDS);
while (replies.isEmpty()) {
notEmptyQueue.await(10, TimeUnit.MILLISECONDS);
if (!doWork) break;
}
if (!doWork && replies.isEmpty()) {
queueLock.unlock();
break;
}
replies.drainTo(list);
queueLock.unlock();
currentReplies = list.size();
for (TOMMessage msg : list) {
cs.getClientsConn().send(new int[] {msg.getSender()}, msg.reply, false);
replier.manageReply(msg.reply, msg.msgCtx);
}
} catch (InterruptedException ex) {
LoggerFactory.getLogger(this.getClass()).error("Could not retrieve reply from queue",ex);
......
......@@ -25,6 +25,7 @@ import java.io.Externalizable;
import java.io.IOException;
import bftsmart.communication.SystemMessage;
import bftsmart.tom.MessageContext;
import bftsmart.tom.util.DebugInfo;
import org.slf4j.LoggerFactory;
......@@ -83,6 +84,7 @@ public class TOMMessage extends SystemMessage implements Externalizable, Compara
//the reply associated with this message
public transient TOMMessage reply = null;
public transient MessageContext msgCtx = null;
public transient boolean alreadyProposed = false;
private int replyServer = -1;
......
......@@ -46,6 +46,8 @@ public interface Executable {
TOMMessage reply = msgCtx.recreateTOMMessage(command);
reply.reply = new TOMMessage(processID, reply.getSession(), reply.getSequence(), reply.getOperationId(),
result, viewID, reply.getReqType());
reply.msgCtx = msgCtx;
return reply;
}
......
......@@ -38,7 +38,7 @@ public interface Replier {
* @param request The executed request
* @param msgCtx The message context associated to the request
*/
public void manageReply(TOMMessage request, MessageContext msgCtx);
public void manageReply(TOMMessage reply, MessageContext msgCtx);
}
......@@ -35,7 +35,7 @@ public class DefaultReplier implements Replier{
private ReplicaContext rc;
@Override
public void manageReply(TOMMessage request, MessageContext msgCtx) {
public void manageReply(TOMMessage reply, MessageContext msgCtx) {
while (rc == null) {
......@@ -53,7 +53,7 @@ public class DefaultReplier implements Replier{
}
}
rc.getServerCommunicationSystem().send(new int[]{request.getSender()}, request.reply);
rc.getServerCommunicationSystem().send(new int[]{msgCtx.getSender()}, reply);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册