提交 ff5f027a 编写于 作者: J João Sousa

Fixed a bug on the client that would make the virtual machine exaust the heap...

Fixed a bug on the client that would make the virtual machine exaust the heap space if asynchronous clients aggressively sent requests to the servers.
上级 c272c430
......@@ -28,6 +28,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
......@@ -44,6 +45,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
......@@ -56,7 +65,6 @@ import bftsmart.reconfiguration.ClientViewController;
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.util.Logger;
import bftsmart.tom.util.TOMUtil;
import java.util.Arrays;
/**
*
......@@ -78,6 +86,8 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
private boolean closed = false;
private EventLoopGroup workerGroup;
private SyncListener listener;
public NettyClientServerCommunicationSystemClientSide(int clientId, ClientViewController controller) {
super();
......@@ -88,6 +98,8 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
SecretKeyFactory fac = SecretKeyFactory.getInstance("PBEWithMD5AndDES");
this.controller = controller;
this.listener = new SyncListener();
//this.st = new Storage(BENCHMARK_PERIOD);
this.rl = new ReentrantReadWriteLock();
signatureLength = TOMUtil.getSignatureSize(controller);
......@@ -322,8 +334,10 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
@Override
public void send(boolean sign, int[] targets, TOMMessage sm) {
listener.waitForChannels(); // wait for the previous transmission to complete
Logger.println("Sending request from " + sm.getSender() + " with sequence number " + sm.getSequence() + " to " + Arrays.toString(targets));
if (sm.serializedMessage == null) {
//serialize message
......@@ -352,7 +366,7 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
sm.serializedMessageSignature = signMessage(
controller.getStaticConf().getRSAPrivateKey(), sm.serializedMessage);
}
int sent = 0;
for (int i = targets.length - 1; i >= 0; i--) {
sm.destination = targets[i];
......@@ -362,7 +376,11 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
rl.readLock().unlock();
if (channel.isActive()) {
sm.signed = sign;
channel.writeAndFlush(sm);
ChannelFuture f = channel.writeAndFlush(sm);
listener.addChanFuture(f);
f.addListener(listener);
sent++;
} else {
Logger.println("Channel to " + targets[i] + " is not connected");
......@@ -498,4 +516,73 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
},time,TimeUnit.SECONDS);
}
private class SyncListener implements GenericFutureListener<ChannelFuture> {
private final Set<ChannelFuture> futures;
private int remainingFutures;
private final Lock futureLock;
private final Condition enoughCompleted;
public SyncListener() {
this.futures = Collections.synchronizedSet(new HashSet<ChannelFuture>());
this.remainingFutures = 0;
this.futureLock = new ReentrantLock();
this.enoughCompleted = futureLock.newCondition();
}
public void addChanFuture(ChannelFuture f) {
this.futures.add(f);
}
@Override
public void operationComplete(ChannelFuture f) {
this.futureLock.lock();
if (this.futures.contains(f)) {
this.remainingFutures--;
this.futures.remove(f);
if (this.remainingFutures <= controller.getCurrentViewF()) {
//if (this.remainingFutures == 0) { // this would make the client wait for all replicas to receive the msg
this.enoughCompleted.signalAll();
}
}
this.futureLock.unlock();
}
public void waitForChannels() {
this.futureLock.lock();
if (this.remainingFutures > controller.getCurrentViewF()) {
//if (this.remainingFutures > 0) { // this would make the client wait for all replicas to receive the msg
try {
this.enoughCompleted.await();
} catch (InterruptedException ex) {
java.util.logging.Logger.getLogger(NettyClientServerCommunicationSystemClientSide.class.getName()).log(Level.SEVERE, null, ex);
}
}
this.remainingFutures = controller.getCurrentViewN();
this.futures.clear();
this.futureLock.unlock();
}
}
}
......@@ -91,7 +91,7 @@ public final class DeliveryThread extends Thread {
// clean the ordered messages from the pending buffer
TOMMessage[] requests = extractMessagesFromDecision(dec);
tomLayer.clientsManager.requestsOrdered(requests);
tomLayer.clientsManager.requestsOrdered(requests);
notEmptyQueue.signalAll();
decidedLock.unlock();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册