提交 0b752909 编写于 作者: L liquidsnake@sapo.pt

In this commit, it was fixed a bug on leader change messages related to the...

In this commit, it was fixed a bug on leader change messages related to the serialization of messages contained in the STOP message. It was also added a method in the BatchBuilder class, that makes it easer to create a batch and is also used during the leader change.
上级 53fdcd13
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package navigators.smart.reconfiguration;
/**
*
* @author Andre Nogueira
*/
public class TTPServices {
public static void main(String[] args) throws InterruptedException {
TTP ttp = new TTP();
if(args.length == 1){
System.out.println("####Tpp Service[Disjoint]####");
int smartId = Integer.parseInt(args[0]);
ttp.removeServer(smartId);
}else if(args.length == 3){
System.out.println("####Tpp Service[Join]####");
int smartId = Integer.parseInt(args[0]);
String ipAddress = args[1];
int port = Integer.parseInt(args[2]);
ttp.addServer(smartId, ipAddress,port);
}else{
System.out.println("Usage: java -jar TppServices <smart id> [ip address] [port]");
System.exit(1);
}
ttp.executeUpdates();
Thread.sleep(2000);
ttp.close();
System.exit(0);
}
}
......@@ -29,6 +29,7 @@ import java.security.MessageDigest;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.SignedObject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
......@@ -347,29 +348,8 @@ public final class TOMLayer extends Thread implements RequestReceiver {
Logger.println("(TOMLayer.run) creating a PROPOSE with " + numberOfMessages + " msgs");
int totalMessageSize = 0; //total size of the messages being batched
byte[][] messages = new byte[numberOfMessages][]; //bytes of the message (or its hash)
byte[][] signatures = new byte[numberOfMessages][]; //bytes of the message (or its hash)
// Fill the array of bytes for the messages/signatures being batched
int i = 0;
for (Iterator<TOMMessage> li = pendingRequests.iterator(); li.hasNext(); i++) {
TOMMessage msg = li.next();
//Logger.println("(TOMLayer.run) adding req " + msg + " to PROPOSE");
messages[i] = msg.serializedMessage;
signatures[i] = msg.serializedMessageSignature;
totalMessageSize += messages[i].length;
}
// return the batch
return bb.createBatch(System.currentTimeMillis(), numberOfNonces, numberOfMessages, totalMessageSize,
this.reconfManager.getStaticConf().getUseSignatures() == 1, messages, signatures,this.reconfManager);
return bb.makeBatch(pendingRequests, numberOfNonces, System.currentTimeMillis(),reconfManager);
}
/**
* This is the main code for this thread. It basically waits until this replica becomes the leader,
* and when so, proposes a value to the other acceptors
......@@ -1362,14 +1342,20 @@ public final class TOMLayer extends Thread implements RequestReceiver {
if (lcManager.getCurrentRequestTimedOut() != null) {
//TODO: Se isto estiver a null, e porque nao houve timeout. Fazer o q?
byte[] msgs = bb.makeBatch(lcManager.getCurrentRequestTimedOut(), 0, 0, reconfManager);
List<TOMMessage> temp = lcManager.getCurrentRequestTimedOut();
out.writeBoolean(true);
out.writeObject(lcManager.getCurrentRequestTimedOut());
out.writeObject(msgs);
}
else {
out.writeBoolean(false);
}
byte[] payload = bos.toByteArray();
out.flush();
bos.flush();
out.close();
bos.close();
......@@ -1428,7 +1414,8 @@ public final class TOMLayer extends Thread implements RequestReceiver {
//TODO: Se isto estiver a null, e porque nao houve timeout. Fazer o q?
out.writeBoolean(true);
out.writeObject(lcManager.getCurrentRequestTimedOut());
byte[] msgs = bb.makeBatch(lcManager.getCurrentRequestTimedOut(), 0, 0, reconfManager);
out.writeObject(msgs);
}
else {
out.writeBoolean(false);
......@@ -1626,20 +1613,17 @@ public final class TOMLayer extends Thread implements RequestReceiver {
bis = new ByteArrayInputStream(msg.getPayload());
ois = new ObjectInputStream(bis);
boolean hasReq = ois.readBoolean();
boolean hasReqs = ois.readBoolean();
clientsManager.getClientsLock().lock();
if (hasReq) {
if (hasReqs) {
// Guardar os pedidos que a outra replica nao conseguiu ordenar
//TODO: Os requests tem q ser verificados!
List<TOMMessage> requests = (List<TOMMessage>) ois.readObject();
for (TOMMessage r : requests) {
clientsManager.requestReceived(r, false);
}
byte[] temp = (byte[]) ois.readObject();
BatchReader batchReader = new BatchReader(temp,
reconfManager.getStaticConf().getUseSignatures() == 1);
TOMMessage[] requests = batchReader.deserialiseRequests(reconfManager);
}
clientsManager.getClientsLock().unlock();
......
......@@ -19,8 +19,10 @@
package navigators.smart.tom.util;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Random;
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.core.messages.TOMMessage;
/**
* Batch format: TIMESTAMP(long) + N_NONCES(int) + SEED(long) +
......@@ -35,7 +37,7 @@ public final class BatchBuilder {
private Random rnd = new Random();
/** build buffer */
public byte[] createBatch(long timestamp, int numberOfNonces, int numberOfMessages, int totalMessagesSize, boolean useSignatures, byte[][] messages, byte[][] signatures, ReconfigurationManager manager) {
private byte[] createBatch(long timestamp, int numberOfNonces, int numberOfMessages, int totalMessagesSize, boolean useSignatures, byte[][] messages, byte[][] signatures, ReconfigurationManager manager) {
int size = 20 + //timestamp 8, nonces 4, nummessages 4
(numberOfNonces > 0 ? 8 : 0) + //seed if needed
(numberOfMessages*(4+(useSignatures?TOMUtil.getSignatureSize(manager):0)))+ // msglength + signature for each msg
......@@ -69,4 +71,29 @@ public final class BatchBuilder {
}
}
public byte[] makeBatch(Collection<TOMMessage> msgs, int numNounces, long timestamp, ReconfigurationManager reconfManager) {
int numMsgs = msgs.size();
int totalMessageSize = 0; //total size of the messages being batched
byte[][] messages = new byte[numMsgs][]; //bytes of the message (or its hash)
byte[][] signatures = new byte[numMsgs][]; //bytes of the message (or its hash)
// Fill the array of bytes for the messages/signatures being batched
int i = 0;
for (TOMMessage msg : msgs) {
//TOMMessage msg = msgs.next();
//Logger.println("(TOMLayer.run) adding req " + msg + " to PROPOSE");
messages[i] = msg.serializedMessage;
signatures[i] = msg.serializedMessageSignature;
totalMessageSize += messages[i].length;
i++;
}
// return the batch
return createBatch(timestamp, numNounces, numMsgs, totalMessageSize,
reconfManager.getStaticConf().getUseSignatures() == 1, messages, signatures,reconfManager);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册