提交 42a3422e 编写于 作者: M marcelmhs

- Added information about replicas connection in hosts.config;

- Finally added change in DeliveryThread to remove messages from the client queue right after it is delivered;
- Fixed bug in ServiceReplica regarding reconfiguration messages not being considered on multiple consensus delivery;
- Small fixes in BFTMap demo;
- Removed empty lines, commented code and useless comments.
上级 46d494e1
......@@ -12,6 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# This file defines the replicas ids, IPs and ports.
# It is used by the replicas and clients to find connection info
# to the initial replicas.
# The ports defined here are the ports used by clients to communicate
# with the replicas. Additional connections are opened by replicas to
# communicate with each other. This additional connection is opened in the
# next port defined here. For an example, consider the line "0 127.0.0.1 11000".
# That means that clients will open a communication channel to replica 0 in
# IP 127.0.0.1 and port 11000. On startup, replicas with id different than 0
# will open a communication channel to replica 0 in port 11001.
# The same holds for replicas 1, 2, 3 ... N.
#server id, address and port (the ids from 0 to n-1 are the service replicas)
0 127.0.0.1 11000
1 127.0.0.1 11010
......
......@@ -413,7 +413,6 @@ public class NettyClientServerCommunicationSystemClientSide extends SimpleChanne
ArrayList<NettyClientServerSession> sessions = new ArrayList<NettyClientServerSession>(sessionTable.values());
rl.readLock().unlock();
for (NettyClientServerSession ncss : sessions) {
ncss.getChannel().close();
}
}
......
......@@ -87,15 +87,10 @@ public class ServerConnection {
this.outQueue = new LinkedBlockingQueue<byte[]>(this.manager.getStaticConf().getOutQueueSize());
this.noMACs = new HashSet<Integer>();
//******* EDUARDO BEGIN **************//
// Connect to the remote process or just wait for the connection?
if (isToConnect()) {
//I have to connect to the remote server
try {
//System.out.println("**********");
//System.out.println(remoteId);
//System.out.println(this.manager.getStaticConf().getServerToServerPort(remoteId));
//System.out.println(this.manager.getStaticConf().getHost(remoteId));
this.socket = new Socket(this.manager.getStaticConf().getHost(remoteId),
this.manager.getStaticConf().getServerToServerPort(remoteId));
ServersCommunicationLayer.setSocketOptions(this.socket);
......@@ -108,7 +103,6 @@ public class ServerConnection {
}
}
//else I have to wait a connection from the remote server
//******* EDUARDO END **************//
if (this.socket != null) {
try {
......
......@@ -64,7 +64,6 @@ public class ServersCommunicationLayer extends Thread {
public ServersCommunicationLayer(ServerViewManager manager,
LinkedBlockingQueue<SystemMessage> inQueue, ServiceReplica replica) throws Exception {
//******* EDUARDO BEGIN **************//
this.manager = manager;
this.inQueue = inQueue;
this.me = manager.getStaticConf().getProcessId();
......@@ -75,7 +74,6 @@ public class ServersCommunicationLayer extends Thread {
int[] initialV = manager.getCurrentViewAcceptors();
for (int i = 0; i < initialV.length; i++) {
if (initialV[i] != me) {
//connections.put(initialV[i], new ServerConnection(manager, null, initialV[i], inQueue));
getConnection(initialV[i]);
}
}
......@@ -83,7 +81,6 @@ public class ServersCommunicationLayer extends Thread {
serverSocket = new ServerSocket(manager.getStaticConf().getServerToServerPort(
manager.getStaticConf().getProcessId()));
//******* EDUARDO END **************//
SecretKeyFactory fac = SecretKeyFactory.getInstance("PBEWithMD5AndDES");
PBEKeySpec spec = new PBEKeySpec(PASSWORD.toCharArray());
......
......@@ -51,8 +51,9 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
public Map<String,byte[]> get(String tableName) {
try {
out = new ByteArrayOutputStream();
new DataOutputStream(out).writeInt(BFTMapRequestType.GET);
new DataOutputStream(out).writeUTF(tableName);
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(BFTMapRequestType.GET);
dos.writeUTF(tableName);
byte[] rep = KVProxy.invokeUnordered(out.toByteArray());
ByteArrayInputStream bis = new ByteArrayInputStream(rep) ;
......@@ -73,9 +74,10 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
public byte[] getEntry(String tableName,String key) {
try {
out = new ByteArrayOutputStream();
new DataOutputStream(out).writeInt(BFTMapRequestType.GET);
new DataOutputStream(out).writeUTF(tableName);
new DataOutputStream(out).writeUTF(key);
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(BFTMapRequestType.GET);
dos.writeUTF(tableName);
dos.writeUTF(key);
byte[] rep = KVProxy.invokeUnordered(out.toByteArray());
return rep;
} catch (IOException ex) {
......@@ -88,8 +90,9 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
public Map<String,byte[]> put(String key, Map<String,byte[]> value) {
try {
out = new ByteArrayOutputStream();
new DataOutputStream(out).writeInt(BFTMapRequestType.TAB_CREATE);
new DataOutputStream(out).writeUTF(key);
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(BFTMapRequestType.TAB_CREATE);
dos.writeUTF(key);
//ByteArrayOutputStream bos = new ByteArrayOutputStream() ;
ObjectOutputStream out1 = new ObjectOutputStream(out) ;
out1.writeObject(value);
......@@ -115,10 +118,11 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
public byte[] putEntry(String tableName, String key, byte[] value) {
try {
out = new ByteArrayOutputStream();
new DataOutputStream(out).writeInt(BFTMapRequestType.PUT);
new DataOutputStream(out).writeUTF(tableName);
new DataOutputStream(out).writeUTF(key);
new DataOutputStream(out).writeUTF(new String(value));
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(BFTMapRequestType.PUT);
dos.writeUTF(tableName);
dos.writeUTF(key);
dos.writeUTF(new String(value));
byte[] rep = KVProxy.invokeOrdered(out.toByteArray());
return rep;
} catch (IOException ex) {
......@@ -133,8 +137,9 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
public Map<String,byte[]> remove(Object key) {
try {
out = new ByteArrayOutputStream();
new DataOutputStream(out).writeInt(BFTMapRequestType.TAB_REMOVE);
new DataOutputStream(out).writeUTF((String) key);
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(BFTMapRequestType.TAB_REMOVE);
dos.writeUTF((String) key);
byte[] rep = KVProxy.invokeOrdered(out.toByteArray());
ByteArrayInputStream bis = new ByteArrayInputStream(rep) ;
......@@ -155,9 +160,10 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
public byte[] removeEntry(String tableName,String key) {
try {
out = new ByteArrayOutputStream();
new DataOutputStream(out).writeInt(BFTMapRequestType.REMOVE);
new DataOutputStream(out).writeUTF((String) tableName);
new DataOutputStream(out).writeUTF((String) key);
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(BFTMapRequestType.REMOVE);
dos.writeUTF((String) tableName);
dos.writeUTF((String) key);
byte[] rep = KVProxy.invokeOrdered(out.toByteArray());
return rep;
} catch (IOException ex) {
......@@ -184,8 +190,9 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
public int size1(String tableName) {
try {
out = new ByteArrayOutputStream();
new DataOutputStream(out).writeInt(BFTMapRequestType.SIZE);
new DataOutputStream(out).writeUTF(tableName);
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(BFTMapRequestType.SIZE);
dos.writeUTF(tableName);
byte[] rep;
rep = KVProxy.invokeUnordered(out.toByteArray());
ByteArrayInputStream in = new ByteArrayInputStream(rep);
......@@ -200,8 +207,9 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
public boolean containsKey(String key) {
try {
out = new ByteArrayOutputStream();
new DataOutputStream(out).writeInt(BFTMapRequestType.TAB_CREATE_CHECK);
new DataOutputStream(out).writeUTF((String) key);
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(BFTMapRequestType.TAB_CREATE_CHECK);
dos.writeUTF((String) key);
byte[] rep;
rep = KVProxy.invokeUnordered(out.toByteArray());
ByteArrayInputStream in = new ByteArrayInputStream(rep);
......@@ -218,9 +226,10 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
public boolean containsKey1(String tableName, String key) {
try {
out = new ByteArrayOutputStream();
new DataOutputStream(out).writeInt(BFTMapRequestType.CHECK);
new DataOutputStream(out).writeUTF((String) tableName);
new DataOutputStream(out).writeUTF((String) key);
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(BFTMapRequestType.CHECK);
dos.writeUTF((String) tableName);
dos.writeUTF((String) key);
byte[] rep;
rep = KVProxy.invokeUnordered(out.toByteArray());
ByteArrayInputStream in = new ByteArrayInputStream(rep);
......@@ -271,7 +280,4 @@ public class BFTMap implements Map<String, Map<String,byte[]>> {
throw new UnsupportedOperationException("Not supported yet.");
}
}
}
\ No newline at end of file
......@@ -50,10 +50,10 @@ public class BFTMapClient {
// System.exit(1);
}
if(ops % 1000 == 0)
if(ops % 100 == 0)
System.out.println("ops sent: "+ops);
ops++;
Thread.sleep(10);
// Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -75,7 +75,7 @@ public class BFTMapServer extends DefaultSingleRecoverable {
String key = new DataInputStream(in).readUTF();
String value = new DataInputStream(in).readUTF();
byte[] valueBytes = value.getBytes();
// System.out.println("Key received: " + key);
System.out.println("Key received: " + key);
byte[] ret = tableMap.addData(tableName, key, valueBytes);
if (ret == null) {
// System.out.println("Return is null, so there was no data before");
......
......@@ -31,7 +31,7 @@ public class MessageContext {
private int consensusId;
private int sender;
private TOMMessage firstInBatch; //to be replaced by a statistics class
private int batchSize; // Used to inform the size of the batch in which the request was decided. Used for state logging.
private boolean lastInBatch; // indicates that the command is the last in the batch. Used for logging
public MessageContext(long timestamp, byte[] nonces, int regency, int consensusId, int sender, TOMMessage firstInBatch) {
this.timestamp = timestamp;
......@@ -91,12 +91,12 @@ public class MessageContext {
return firstInBatch;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public void setLastInBatch() {
lastInBatch = true;
}
public boolean isLastInBatch() {
return lastInBatch;
}
}
......@@ -238,7 +238,8 @@ public class ServiceReplica {
if (request.getReqType() == TOMMessageType.ORDERED_REQUEST) {
numRequests++;
MessageContext msgCtx = new MessageContext(firstRequest.timestamp, firstRequest.nonces, regency, consId[consensusCount], request.getSender(), firstRequest);
msgCtx.setBatchSize(requestsFromConsensus.length);
if(consensusCount + 1 == requestsFromConsensus.length)
msgCtx.setLastInBatch();
request.deliveryTime = System.nanoTime();
if(executor instanceof BatchExecutable) {
msgCtxts.add(msgCtx);
......
......@@ -34,10 +34,7 @@ public abstract class TOMSender implements ReplyReceiver {
private int me; // process id
//******* EDUARDO BEGIN **************//
//private int[] group; // group of replicas
private ClientViewManager viewManager;
//******* EDUARDO END **************//
private int session = 0; // session id
private int sequence = 0; // sequence number
......
......@@ -85,6 +85,11 @@ public final class DeliveryThread extends Thread {
try {
decidedLock.lock();
decided.put(cons);
// clean the ordered messages from the pending buffer
TOMMessage[] requests = extractMessagesFromDecision(cons);
tomLayer.clientsManager.requestsOrdered(requests);
notEmptyQueue.signalAll();
decidedLock.unlock();
Logger.println("(DeliveryThread.delivery) Consensus " + cons.getId() + " finished. Decided size=" + decided.size());
......@@ -138,8 +143,6 @@ public final class DeliveryThread extends Thread {
//be removed from the leaderManager and the executionManager
if (lastEid > 2) {
int stableConsensus = lastEid - 3;
//tomLayer.lm.removeStableMultipleConsenusInfos(lastCheckpointEid, stableConsensus);
tomLayer.execManager.removeOutOfContexts(stableConsensus);
}
......@@ -195,11 +198,6 @@ public final class DeliveryThread extends Thread {
Consensus lastConsensus = consensuses.get(consensuses.size() - 1);
if (requests != null && requests.length > 0) {
// clean the ordered messages from the pending buffer
for(int i = 0; i < requests.length; i++) {
tomLayer.clientsManager.requestsOrdered(requests[i]);
}
deliverMessages(consensusIds, tomLayer.getLCManager().getLastReg(), requests);
// ******* EDUARDO BEGIN ***********//
......@@ -258,7 +256,7 @@ public final class DeliveryThread extends Thread {
return requests;
}
public void deliverUnordered(TOMMessage request, int regency) {
protected void deliverUnordered(TOMMessage request, int regency) {
MessageContext msgCtx = new MessageContext(System.currentTimeMillis(),
new byte[0], regency, -1, request.getSender(), null);
receiver.receiveReadonlyMessage(request, msgCtx);
......@@ -282,16 +280,4 @@ public final class DeliveryThread extends Thread {
tomLayer.getCommunication().updateServersConnections();
}
private void logDecision(Consensus cons) {
if (manager.getStaticConf().getCheckpointPeriod() > 0) {
if ((cons.getId() > 0) && ((cons.getId() % manager.getStaticConf().getCheckpointPeriod()) == 0)) {
Logger.println("(DeliveryThread.run) Performing checkpoint for consensus " + cons.getId());
//byte[] state = receiver.getState();
//tomLayer.getStateManager().saveState(state, cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
} else {
Logger.println("(DeliveryThread.run) Storing message batch in the state log for consensus " + cons.getId());
//tomLayer.getStateManager().saveBatch(cons.getDecision(), cons.getId(), cons.getDecisionRound().getNumber(), tomLayer.lm.getCurrentLeader()/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
}
}
}
}
......@@ -17,6 +17,8 @@ package bftsmart.tom.server.defaultservices;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
......@@ -43,8 +45,7 @@ public abstract class DefaultSingleRecoverable implements Recoverable, SingleExe
private MessageDigest md;
private StateLog log;
private int messageCounter;
private byte[][] commands;
private List<byte[]> commands = new ArrayList<byte[]>();
private StateManager stateManager;
......@@ -66,13 +67,9 @@ public abstract class DefaultSingleRecoverable implements Recoverable, SingleExe
byte[] reply = appExecuteOrdered(command, msgCtx);
stateLock.unlock();
if(messageCounter == 0) { //first message of the batch
commands = new byte[msgCtx.getBatchSize()][];
}
commands[messageCounter] = command;
messageCounter++;
commands.add(command);
if(messageCounter == msgCtx.getBatchSize()) {
if(msgCtx.isLastInBatch()) {
if ((eid > 0) && ((eid % CHECKPOINT_PERIOD) == 0)) {
Logger.println("(DurabilityCoordinator.executeBatch) Performing checkpoint for consensus " + eid);
stateLock.lock();
......@@ -81,9 +78,9 @@ public abstract class DefaultSingleRecoverable implements Recoverable, SingleExe
saveState(snapshot, eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
} else {
Logger.println("(DurabilityCoordinator.executeBatch) Storing message batch in the state log for consensus " + eid);
saveCommands(commands, eid, 0, 0);
saveCommands(commands.toArray(new byte[0][]), eid, 0, 0);
}
messageCounter = 0;
commands = new ArrayList<byte[]>();
}
return reply;
}
......
......@@ -17,6 +17,8 @@ package bftsmart.tom.server.defaultservices;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Condition;
......@@ -38,7 +40,7 @@ import bftsmart.tom.util.Logger;
*/
public abstract class FIFOExecutableRecoverable implements Recoverable, FIFOExecutable {
public static final int CHECKPOINT_PERIOD = 10000;
public static final int CHECKPOINT_PERIOD = 0;
private ReentrantLock logLock = new ReentrantLock();
private ReentrantLock hashLock = new ReentrantLock();
......@@ -47,26 +49,26 @@ public abstract class FIFOExecutableRecoverable implements Recoverable, FIFOExec
private MessageDigest md;
private StateLog log;
private int messageCounter;
private byte[][] commands;
private List<byte[]> commands = new ArrayList<byte[]>();
private StateManager stateManager;
private ConcurrentMap<Integer, Integer> clientOperations;
protected ConcurrentMap<Integer, Integer> clientOperations;
private Lock fifoLock = new ReentrantLock();
private Condition updatedState = fifoLock.newCondition();
public FIFOExecutableRecoverable() {
clientOperations = new ConcurrentHashMap<Integer, Integer>();
try {
md = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException ex) {
java.util.logging.Logger.getLogger(FIFOExecutableRecoverable.class.getName()).log(Level.SEVERE, null, ex);
}
log = new StateLog(CHECKPOINT_PERIOD);
if(CHECKPOINT_PERIOD > 0)
log = new StateLog(CHECKPOINT_PERIOD);
}
@Override
public byte[] executeOrderedFIFO(byte[] command, MessageContext msgCtx, int clientId, int operationId) {
int eid = msgCtx.getConsensusId();
......@@ -93,14 +95,9 @@ public abstract class FIFOExecutableRecoverable implements Recoverable, FIFOExec
updatedState.signalAll();
fifoLock.unlock();
if(messageCounter == 0) { //first message of the batch
commands = new byte[msgCtx.getBatchSize()][];
}
commands[messageCounter] = command;
messageCounter++;
commands.add(command);
if(messageCounter == msgCtx.getBatchSize()) {
if(CHECKPOINT_PERIOD > 0 && msgCtx.isLastInBatch()) {
if ((eid > 0) && ((eid % CHECKPOINT_PERIOD) == 0)) {
Logger.println("(DurabilityCoordinator.executeBatch) Performing checkpoint for consensus " + eid);
stateLock.lock();
......@@ -109,9 +106,9 @@ public abstract class FIFOExecutableRecoverable implements Recoverable, FIFOExec
saveState(snapshot, eid, 0, 0/*tomLayer.lm.getLeader(cons.getId(), cons.getDecisionRound().getNumber())*/);
} else {
Logger.println("(DurabilityCoordinator.executeBatch) Storing message batch in the state log for consensus " + eid);
saveCommands(commands, eid, 0, 0);
saveCommands(commands.toArray(new byte[0][]), eid, 0, 0);
}
messageCounter = 0;
commands = new ArrayList<byte[]>();
}
return reply;
}
......@@ -200,20 +197,13 @@ public abstract class FIFOExecutableRecoverable implements Recoverable, FIFOExec
@Override
public int setState(ApplicationState recvState) {
int lastEid = -1;
if (recvState instanceof DefaultApplicationState) {
DefaultApplicationState state = (DefaultApplicationState) recvState;
System.out.println("(DurabilityCoordinator.setState) last eid in state: " + state.getLastEid());
getLog().update(state);
int lastCheckpointEid = state.getLastCheckpointEid();
lastEid = state.getLastEid();
bftsmart.tom.util.Logger.println("(DurabilityCoordinator.setState) I'm going to update myself from EID "
+ lastCheckpointEid + " to EID " + lastEid);
......
......@@ -80,14 +80,11 @@ public class TOMUtil {
ObjectOutputStream obOut = null;
try {
obOut = new ObjectOutputStream(bOut);
obOut.writeObject(o);
obOut.flush();
bOut.flush();
obOut.close();
bOut.close();
} catch (IOException ex) {
ex.printStackTrace();
return null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册