diff --git a/src/navigators/smart/tom/ServiceProxy.java b/src/navigators/smart/tom/ServiceProxy.java index 740289c846664f0788f14539f93ec5eec471920f..51793d28849197bedaf5eae2ecb78f10c5295a16 100644 --- a/src/navigators/smart/tom/ServiceProxy.java +++ b/src/navigators/smart/tom/ServiceProxy.java @@ -19,25 +19,28 @@ package navigators.smart.tom; //import br.ufsc.das.communication.SimpleCommunicationSystem; import java.util.Arrays; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.ListIterator; import java.util.concurrent.Semaphore; import navigators.smart.reconfiguration.ReconfigurationManager; import navigators.smart.reconfiguration.ReconfigureReply; import navigators.smart.reconfiguration.View; import navigators.smart.tom.core.messages.TOMMessage; +import navigators.smart.tom.util.Extractor; +import navigators.smart.tom.util.Logger; import navigators.smart.tom.util.TOMUtil; + /** - * This class implements a TOMReplyReceiver, and a proxy to be used on the client side of the application. - * It sends a request to the replicas, receives the reply, and delivers it to the - * application - * + * This class implements a TOMSender and represents a proxy to be used on the + * client side of the replicated system. + * It sends a request to the replicas, receives the reply, and delivers it to + * the application. */ public class ServiceProxy extends TOMSender { //******* EDUARDO BEGIN **************// - // private int n; // Number of total replicas in the system - // private int f; // Number of maximum faulty replicas assumed to occur - // Semaphores used to render this object thread-safe // TODO: o mutex nao poderia ser antes um reentrantlock, em vez de um semaforo? //Eduardo: Acho que pode até ser um reentrantlock, mas apenas pode ser liberado (no caso do semaforo é a mesma coisa) @@ -45,36 +48,69 @@ public class ServiceProxy extends TOMSender { private Semaphore mutex = new Semaphore(1); private Semaphore mutexToSend = new Semaphore(1); //******* EDUARDO END **************// - private Semaphore sm = new Semaphore(0); private int reqId = -1; // request id + private int replyQuorum = 0; // size of the reply quorum private TOMMessage replies[] = null; // Replies from replicas are stored here - private TOMMessage response = null; // Pointer to the reply that is actually delivered to the application + private int receivedReplies = 0; // Number of received replies + private TOMMessage response = null; // Reply delivered to the application + private LinkedList aheadOfTimeReplies = new LinkedList(); + private Comparator comparator; + private Extractor extractor; /** * Constructor * - * @param id Process id for this client + * @see bellow */ public ServiceProxy(int processId) { - init(processId); - replies = new TOMMessage[getViewManager().getCurrentViewN()]; + this(processId, null, null, null); } /** * Constructor * - * @param id Process id for this client - * @param configHome Configuration directory for JBP - * TODO: E mesmo a directoria do JBP, ou de alguma biblioteca de terceiros? + * @see bellow */ public ServiceProxy(int processId, String configHome) { - init(processId, configHome); - replies = new TOMMessage[getViewManager().getCurrentViewN()]; + this(processId, configHome, null, null); } + /** + * Constructor + * + * @param id Process id for this client (should be different from replicas) + * @param configHome Configuration directory for BFT-SMART + * @param replyComparator used for comparing replies from different servers + * to extract one returned by f+1 + * @param replyExtractor used for extracting the response from the matching + * quorum of replies + */ + public ServiceProxy(int processId, String configHome, + Comparator replyComparator, Extractor replyExtractor) { + if (configHome == null) { + init(processId); + } else { + init(processId, configHome); + } + + replies = new TOMMessage[getViewManager().getCurrentViewN()]; + + comparator = (replyComparator != null)? replyComparator : new Comparator() { + @Override + public int compare(byte[] o1, byte[] o2) { + return Arrays.equals(o1, o2) ? 0 : -1; + } + }; + + extractor = (replyExtractor != null)? replyExtractor : new Extractor() { + @Override + public TOMMessage extractResponse(TOMMessage[] replies, int sameContent, int lastReceived) { + return replies[lastReceived]; + } + }; + } - //******* EDUARDO BEGIN **************// /** * This method sends a request to the replicas, and returns the related reply. This method is * thread-safe. @@ -86,20 +122,17 @@ public class ServiceProxy extends TOMSender { return invoke(request, ReconfigurationManager.TOM_NORMAL_REQUEST, false); } - /*public byte[] invoke(byte[] request) { - return invoke(request,false); - }*/ public byte[] invoke(byte[] request, boolean readOnly) { return invoke(request, ReconfigurationManager.TOM_NORMAL_REQUEST, readOnly); } - //******* EDUARDO END **************// - /** - * This method sends a request to the replicas, and returns the related reply. This method is - * thread-safe. + * This method sends a request to the replicas, and returns the related reply. + * This method is thread-safe. * * @param request Request to be sent + * @param reqType TOM_NORMAL_REQUESTS for service requests, and other for + * reconfig requests. * @param readOnly it is a read only request (will not be ordered) * @return The reply from the replicas related to request */ @@ -109,21 +142,25 @@ public class ServiceProxy extends TOMSender { // This ensures the thread-safety by means of a semaphore try { this.mutexToSend.acquire(); - } catch (Exception e) { - e.printStackTrace(); - } + } catch (Exception e) {} - // Discard previous replies + // Clean all statefull data to prepare for receiving next replies Arrays.fill(replies, null); + receivedReplies = 0; response = null; + //if n=3f+1, read-only requests wait for 2f+1 matching replies while normal + //requests wait for only f+1 + replyQuorum = readOnly? + ((int) Math.ceil((getViewManager().getCurrentViewN() + getViewManager().getCurrentViewF()) / 2) + 1): + (getViewManager().getCurrentViewF() + 1); + + Logger.println("Sending request (readOnly = "+readOnly+")"); + Logger.println("Expected number of matching replies: "+replyQuorum); + // Send the request to the replicas, and get its ID doTOMulticast(request, reqType, readOnly); reqId = getLastSequenceNumber(); - // Critical section ends here. The semaphore can be released - //NAO PODE LIBERAR O SEMAFORO AQUI - //this.mutex.release(); - // This instruction blocks the thread, until a response is obtained. // The thread will be unblocked when the method replyReceived is invoked // by the client side communication system @@ -133,64 +170,85 @@ public class ServiceProxy extends TOMSender { ex.printStackTrace(); } - //******* EDUARDO BEGIN **************// + Logger.println("Response extracted = "+response); + + if (response == null) { + //the response can be null if n-f replies are received but there isn't + //a replyQuorum of matching replies + Logger.println("Received n-f replies and no response could be extracted."); - byte[] ret = null; - if (reqType == ReconfigurationManager.TOM_NORMAL_REQUEST) { - //Reply to a normal request! - if (response.getViewID() == getViewManager().getCurrentViewId()) { - ret = response.getContent(); // return the response - mutexToSend.release(); - return ret; - } else { // if(response.getViewID() > getViewManager().getCurrentViewId()){ - //updated view received - reconfigureTo((View) TOMUtil.getObject(response.getContent())); - mutexToSend.release(); - return invoke(request, reqType, readOnly); + mutexToSend.release(); + + if(readOnly) { + //invoke the operation again, whitout the read-only flag + Logger.println("###############################################"); + Logger.println("###################RETRY#######################"); + Logger.println("###############################################"); + return invoke(request, reqType, false); + } else { + throw new RuntimeException("Received n-f replies without f+1 of them matching."); } } else { - //Reply to a reconfigure request! - System.out.println("Recebeu reply para uma reconfigure request!"); - //É "impossivel" ser menor! - if (response.getViewID() > getViewManager().getCurrentViewId()) { - Object r = TOMUtil.getObject(response.getContent()); - if(r instanceof View){ //Não executou esta requisição pq a visao estava desatualizada - reconfigureTo((View) r); + //normal operation + //******* EDUARDO BEGIN **************// + byte[] ret = null; + if (reqType == ReconfigurationManager.TOM_NORMAL_REQUEST) { + //Reply to a normal request! + if (response.getViewID() == getViewManager().getCurrentViewId()) { + ret = response.getContent(); // return the response + mutexToSend.release(); + return ret; + } else {//if(response.getViewID() > getViewManager().getCurrentViewId()) + //updated view received + reconfigureTo((View) TOMUtil.getObject(response.getContent())); mutexToSend.release(); return invoke(request, reqType, readOnly); - }else{ //Executou a requisição de reconfiguração - reconfigureTo(((ReconfigureReply)r).getView()); + } + } else { + //Reply to a reconfigure request! + Logger.println("Reconfiguration request' reply received!"); + //É "impossivel" ser menor! + if (response.getViewID() > getViewManager().getCurrentViewId()) { + Object r = TOMUtil.getObject(response.getContent()); + if (r instanceof View) { //Não executou esta requisição pq a visao estava desatualizada + reconfigureTo((View) r); + mutexToSend.release(); + return invoke(request, reqType, readOnly); + } else { //Executou a requisição de reconfiguração + reconfigureTo(((ReconfigureReply) r).getView()); + ret = response.getContent(); // return the response + mutexToSend.release(); + return ret; + } + } else { + //Caso a reconfiguração nao foi executada porque algum parametro + // da requisição estava incorreto: o processo queria fazer algo que nao é permitido ret = response.getContent(); // return the response mutexToSend.release(); return ret; } - }else{ - //Caso a reconfiguração nao foi executada porque algum parametro - // da requisição estava incorreto: o processo queria fazer algo que nao é permitido - ret = response.getContent(); // return the response - mutexToSend.release(); - return ret; } } //******* EDUARDO END **************// } //******* EDUARDO BEGIN **************// - private void reconfigureTo(View v){ - System.out.println("Recebeu uma visão mais atual!"); + private void reconfigureTo(View v) { + Logger.println("Installing a most up-to-date view"); getViewManager().reconfigureTo(v); replies = new TOMMessage[getViewManager().getCurrentViewN()]; getCommunicationSystem().updateConnections(); } //******* EDUARDO END **************// + /** * This is the method invoked by the client side comunication system. * * @param reply The reply delivered by the client side comunication system */ + @Override public void replyReceived(TOMMessage reply) { - - + Logger.println("reply received: sender="+reply.getSender()+" reqId="+reply.getSequence()); // Ahead lies a critical section. // This ensures the thread-safety by means of a semaphore try { @@ -199,54 +257,68 @@ public class ServiceProxy extends TOMSender { e.printStackTrace(); } - int sender = reply.getSender(); - - //******* EDUARDO BEGIN **************// - int pos = getViewManager().getCurrentViewPos(sender); - //******* EDUARDO END **************// - + //******* EDUARDO BEGIN **************// + int pos = getViewManager().getCurrentViewPos(reply.getSender()); if (pos < 0) { //ignore messages that don't come from replicas + this.mutex.release(); return; } + //******* EDUARDO END **************// + if (reply.getSequence() > reqId) { // Is this a reply for the last request sent? + Logger.println("Storing reply from "+reply.getSender()+" with reqId:"+reply.getSequence()); + aheadOfTimeReplies.add(reply); + } else if(reply.getSequence() == reqId) { + Logger.println("Receiving reply from "+reply.getSender()+" with reqId:"+reply.getSequence()+". Putting on pos="+pos); - //System.out.println("Recebeu reply de "+sender+" pos: "+pos+" reqId:"+reply.getSequence()); + if(receivedReplies == 0) { + //If this is the first reply received for reqId, lets look at ahead + //of time messages to process possible messages for this reqId that + //were already received + for(ListIterator li = aheadOfTimeReplies.listIterator(); li.hasNext(); ) { + TOMMessage rr = li.next(); + if(rr.getSequence() == reqId) { + Logger.println("Adding old message."); + int rpos = getViewManager().getCurrentViewPos(rr.getSender()); + receivedReplies++; + replies[rpos] = rr; + li.remove(); + } + } + } - if (reply.getSequence() == reqId) { // Is this a reply for the last request sent? - //System.out.println("Vai contar reply de "+sender+" pos: "+pos+" reqId:"+reqId); + receivedReplies++; replies[pos] = reply; + // Compare the reply just received, to the others int sameContent = 1; for (int i = 0; i < replies.length; i++) { - if (i != pos && replies[i] != null && Arrays.equals(replies[i].getContent(), reply.getContent())) { + Logger.println(i+" ("+reqId+"): "+sameContent+"/"+receivedReplies+"/"+replyQuorum); + if (i != pos && replies[i] != null && (comparator.compare(replies[i].getContent(), reply.getContent()) == 0)) { sameContent++; - //******* EDUARDO BEGIN **************// - if (sameContent >= getViewManager().getCurrentViewF() + 1) { - //******* EDUARDO END **************// + + if (sameContent >= replyQuorum) { + response = extractor.extractResponse(replies, sameContent, pos); + reqId = -1; + this.sm.release(); // resumes the thread that is executing the "invoke" method break; } - }/*else{ - if(i != pos && replies[i] != null){ - System.out.println("Resposta diferente......... "+sender); - } - }*/ + } } - //System.out.println("contador "+sameContent); - - //******* EDUARDO BEGIN **************// - // Is there already more than f equal replies? - if (sameContent >= getViewManager().getCurrentViewF() + 1) { - response = reply; + if (response == null && receivedReplies >= /*n-f*/ + getViewManager().getCurrentViewN() - getViewManager().getCurrentViewF()) { + //it's not safe to wait for more replies (n-f replies received), + //but there is no response available... reqId = -1; - //this.mutexToSend.release(); - this.sm.release(); // unblocks the thread that is executing the "invoke" method, - // so it can deliver the reply to the application + Logger.println("releasing sm without response"); + this.sm.release(); // resumes the thread that is executing the "invoke" method } - //******* EDUARDO END **************// + } else { + Logger.println("Discarding reply from "+reply.getSender()+" with reqId:"+reply.getSequence()+". Putting on pos="+pos); } + // Critical section ends here. The semaphore can be released this.mutex.release(); } } -