From 0b7529094f418fb3988cb404429df975665dc165 Mon Sep 17 00:00:00 2001 From: "liquidsnake@sapo.pt" Date: Thu, 1 Sep 2011 16:17:14 +0000 Subject: [PATCH] In this commit, it was fixed a bug on leader change messages related to the serialization of messages contained in the STOP message. It was also added a method in the BatchBuilder class, that makes it easer to create a batch and is also used during the leader change. --- .../smart/reconfiguration/TTPServices.java | 48 ++++++++++++++++++ src/navigators/smart/tom/core/TOMLayer.java | 50 +++++++------------ .../smart/tom/util/BatchBuilder.java | 29 ++++++++++- 3 files changed, 93 insertions(+), 34 deletions(-) create mode 100644 src/navigators/smart/reconfiguration/TTPServices.java diff --git a/src/navigators/smart/reconfiguration/TTPServices.java b/src/navigators/smart/reconfiguration/TTPServices.java new file mode 100644 index 00000000..57cd2c58 --- /dev/null +++ b/src/navigators/smart/reconfiguration/TTPServices.java @@ -0,0 +1,48 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package navigators.smart.reconfiguration; + +/** + * + * @author Andre Nogueira + */ + +public class TTPServices { + public static void main(String[] args) throws InterruptedException { + + TTP ttp = new TTP(); + + + if(args.length == 1){ + System.out.println("####Tpp Service[Disjoint]####"); + + int smartId = Integer.parseInt(args[0]); + + ttp.removeServer(smartId); + + }else if(args.length == 3){ + System.out.println("####Tpp Service[Join]####"); + + int smartId = Integer.parseInt(args[0]); + String ipAddress = args[1]; + int port = Integer.parseInt(args[2]); + + ttp.addServer(smartId, ipAddress,port); + + }else{ + System.out.println("Usage: java -jar TppServices [ip address] [port]"); + System.exit(1); + } + + ttp.executeUpdates(); + + Thread.sleep(2000); + + ttp.close(); + + System.exit(0); + } +} diff --git a/src/navigators/smart/tom/core/TOMLayer.java b/src/navigators/smart/tom/core/TOMLayer.java index 7d682748..0e7edf55 100644 --- a/src/navigators/smart/tom/core/TOMLayer.java +++ b/src/navigators/smart/tom/core/TOMLayer.java @@ -29,6 +29,7 @@ import java.security.MessageDigest; import java.security.PrivateKey; import java.security.Signature; import java.security.SignedObject; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -347,29 +348,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { Logger.println("(TOMLayer.run) creating a PROPOSE with " + numberOfMessages + " msgs"); - int totalMessageSize = 0; //total size of the messages being batched - - byte[][] messages = new byte[numberOfMessages][]; //bytes of the message (or its hash) - byte[][] signatures = new byte[numberOfMessages][]; //bytes of the message (or its hash) - - // Fill the array of bytes for the messages/signatures being batched - int i = 0; - for (Iterator li = pendingRequests.iterator(); li.hasNext(); i++) { - TOMMessage msg = li.next(); - //Logger.println("(TOMLayer.run) adding req " + msg + " to PROPOSE"); - messages[i] = msg.serializedMessage; - signatures[i] = msg.serializedMessageSignature; - - totalMessageSize += messages[i].length; - } - - // return the batch - return bb.createBatch(System.currentTimeMillis(), numberOfNonces, numberOfMessages, totalMessageSize, - this.reconfManager.getStaticConf().getUseSignatures() == 1, messages, signatures,this.reconfManager); + return bb.makeBatch(pendingRequests, numberOfNonces, System.currentTimeMillis(),reconfManager); } - - - /** * This is the main code for this thread. It basically waits until this replica becomes the leader, * and when so, proposes a value to the other acceptors @@ -1362,14 +1342,20 @@ public final class TOMLayer extends Thread implements RequestReceiver { if (lcManager.getCurrentRequestTimedOut() != null) { //TODO: Se isto estiver a null, e porque nao houve timeout. Fazer o q? + byte[] msgs = bb.makeBatch(lcManager.getCurrentRequestTimedOut(), 0, 0, reconfManager); + List temp = lcManager.getCurrentRequestTimedOut(); out.writeBoolean(true); - out.writeObject(lcManager.getCurrentRequestTimedOut()); + out.writeObject(msgs); } else { out.writeBoolean(false); } byte[] payload = bos.toByteArray(); + + out.flush(); + bos.flush(); + out.close(); bos.close(); @@ -1428,7 +1414,8 @@ public final class TOMLayer extends Thread implements RequestReceiver { //TODO: Se isto estiver a null, e porque nao houve timeout. Fazer o q? out.writeBoolean(true); - out.writeObject(lcManager.getCurrentRequestTimedOut()); + byte[] msgs = bb.makeBatch(lcManager.getCurrentRequestTimedOut(), 0, 0, reconfManager); + out.writeObject(msgs); } else { out.writeBoolean(false); @@ -1626,20 +1613,17 @@ public final class TOMLayer extends Thread implements RequestReceiver { bis = new ByteArrayInputStream(msg.getPayload()); ois = new ObjectInputStream(bis); - boolean hasReq = ois.readBoolean(); + boolean hasReqs = ois.readBoolean(); clientsManager.getClientsLock().lock(); - if (hasReq) { + if (hasReqs) { // Guardar os pedidos que a outra replica nao conseguiu ordenar //TODO: Os requests tem q ser verificados! - List requests = (List) ois.readObject(); - - for (TOMMessage r : requests) { - - clientsManager.requestReceived(r, false); - } - + byte[] temp = (byte[]) ois.readObject(); + BatchReader batchReader = new BatchReader(temp, + reconfManager.getStaticConf().getUseSignatures() == 1); + TOMMessage[] requests = batchReader.deserialiseRequests(reconfManager); } clientsManager.getClientsLock().unlock(); diff --git a/src/navigators/smart/tom/util/BatchBuilder.java b/src/navigators/smart/tom/util/BatchBuilder.java index d9877af2..36618542 100644 --- a/src/navigators/smart/tom/util/BatchBuilder.java +++ b/src/navigators/smart/tom/util/BatchBuilder.java @@ -19,8 +19,10 @@ package navigators.smart.tom.util; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.Random; import navigators.smart.reconfiguration.ReconfigurationManager; +import navigators.smart.tom.core.messages.TOMMessage; /** * Batch format: TIMESTAMP(long) + N_NONCES(int) + SEED(long) + @@ -35,7 +37,7 @@ public final class BatchBuilder { private Random rnd = new Random(); /** build buffer */ - public byte[] createBatch(long timestamp, int numberOfNonces, int numberOfMessages, int totalMessagesSize, boolean useSignatures, byte[][] messages, byte[][] signatures, ReconfigurationManager manager) { + private byte[] createBatch(long timestamp, int numberOfNonces, int numberOfMessages, int totalMessagesSize, boolean useSignatures, byte[][] messages, byte[][] signatures, ReconfigurationManager manager) { int size = 20 + //timestamp 8, nonces 4, nummessages 4 (numberOfNonces > 0 ? 8 : 0) + //seed if needed (numberOfMessages*(4+(useSignatures?TOMUtil.getSignatureSize(manager):0)))+ // msglength + signature for each msg @@ -69,4 +71,29 @@ public final class BatchBuilder { } } + public byte[] makeBatch(Collection msgs, int numNounces, long timestamp, ReconfigurationManager reconfManager) { + + int numMsgs = msgs.size(); + int totalMessageSize = 0; //total size of the messages being batched + + byte[][] messages = new byte[numMsgs][]; //bytes of the message (or its hash) + byte[][] signatures = new byte[numMsgs][]; //bytes of the message (or its hash) + + // Fill the array of bytes for the messages/signatures being batched + int i = 0; + for (TOMMessage msg : msgs) { + //TOMMessage msg = msgs.next(); + //Logger.println("(TOMLayer.run) adding req " + msg + " to PROPOSE"); + messages[i] = msg.serializedMessage; + signatures[i] = msg.serializedMessageSignature; + + totalMessageSize += messages[i].length; + i++; + } + + // return the batch + return createBatch(timestamp, numNounces, numMsgs, totalMessageSize, + reconfManager.getStaticConf().getUseSignatures() == 1, messages, signatures,reconfManager); + + } } -- GitLab