/** * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags * * This file is part of SMaRt. * * SMaRt is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * SMaRt is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along with SMaRt. If not, see . */ package navigators.smart.tom; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import navigators.smart.reconfiguration.ReconfigurationManager; import navigators.smart.reconfiguration.ReconfigureReply; import navigators.smart.reconfiguration.View; import navigators.smart.tom.core.messages.TOMMessage; import navigators.smart.tom.util.Extractor; import navigators.smart.tom.util.Logger; import navigators.smart.tom.util.TOMUtil; /** * This class implements a TOMSender and represents a proxy to be used on the * client side of the replicated system. * It sends a request to the replicas, receives the reply, and delivers it to * the application. */ public class ServiceProxy extends TOMSender { // Locks for send requests and receive replies private ReentrantLock canReceiveLock = new ReentrantLock(); private ReentrantLock canSendLock = new ReentrantLock(); private Semaphore sm = new Semaphore(0); private int reqId = -1; // request id private int replyQuorum = 0; // size of the reply quorum private TOMMessage replies[] = null; // Replies from replicas are stored here private int receivedReplies = 0; // Number of received replies private TOMMessage response = null; // Reply delivered to the application //private LinkedList aheadOfTimeReplies = new LinkedList(); private int invokeTimeout = 60; private Comparator comparator; private Extractor extractor; /** * Constructor * * @see bellow */ public ServiceProxy(int processId) { this(processId, null, null, null); } /** * Constructor * * @see bellow */ public ServiceProxy(int processId, String configHome) { this(processId, configHome, null, null); } /** * Constructor * * @param id Process id for this client (should be different from replicas) * @param configHome Configuration directory for BFT-SMART * @param replyComparator used for comparing replies from different servers * to extract one returned by f+1 * @param replyExtractor used for extracting the response from the matching * quorum of replies */ public ServiceProxy(int processId, String configHome, Comparator replyComparator, Extractor replyExtractor) { if (configHome == null) { init(processId); } else { init(processId, configHome); } replies = new TOMMessage[getViewManager().getCurrentViewN()]; comparator = (replyComparator != null) ? replyComparator : new Comparator() { @Override public int compare(byte[] o1, byte[] o2) { return Arrays.equals(o1, o2) ? 0 : -1; } }; extractor = (replyExtractor != null) ? replyExtractor : new Extractor() { @Override public TOMMessage extractResponse(TOMMessage[] replies, int sameContent, int lastReceived) { return replies[lastReceived]; } }; } /** * Get the amount of time (in seconds) that this proxy will wait for * servers replies before returning null. * * @return the invokeTimeout */ public int getInvokeTimeout() { return invokeTimeout; } /** * Set the amount of time (in seconds) that this proxy will wait for * servers replies before returning null. * * @param invokeTimeout the invokeTimeout to set */ public void setInvokeTimeout(int invokeTimeout) { this.invokeTimeout = invokeTimeout; } /** * This method sends a request to the replicas, and returns the related reply. This method is * thread-safe. * * @param request Request to be sent * @return The reply from the replicas related to request */ public byte[] invoke(byte[] request) { return invoke(request, ReconfigurationManager.TOM_NORMAL_REQUEST); } public byte[] invoke(byte[] request, boolean readOnly) { int type = (readOnly) ? ReconfigurationManager.TOM_READONLY_REQUEST : ReconfigurationManager.TOM_NORMAL_REQUEST; return invoke(request, type); } /** * This method sends a request to the replicas, and returns the related reply. * If the servers take more than invokeTimeout seconds the method returns null. * This method is thread-safe. * * @param request Request to be sent * @param reqType TOM_NORMAL_REQUESTS for service requests, and other for * reconfig requests. * @return The reply from the replicas related to request */ public byte[] invoke(byte[] request, int reqType) { canSendLock.lock(); // Clean all statefull data to prepare for receiving next replies Arrays.fill(replies, null); receivedReplies = 0; response = null; replyQuorum = (int) Math.ceil((getViewManager().getCurrentViewN() + getViewManager().getCurrentViewF()) / 2) + 1; // Send the request to the replicas, and get its ID reqId = generateRequestId(); TOMulticast(request, reqId, reqType); Logger.println("Sending request (readOnly = " + (reqType == ReconfigurationManager.TOM_READONLY_REQUEST) + ") with reqId=" + reqId); Logger.println("Expected number of matching replies: " + replyQuorum); // This instruction blocks the thread, until a response is obtained. // The thread will be unblocked when the method replyReceived is invoked // by the client side communication system try { if (!this.sm.tryAcquire(invokeTimeout, TimeUnit.SECONDS)) { Logger.println("###################TIMEOUT#######################"); Logger.println("Reply timeout for reqId=" + reqId); canSendLock.unlock(); return null; } } catch (InterruptedException ex) { } Logger.println("Response extracted = " + response); byte[] ret = null; if (response == null) { //the response can be null if n-f replies are received but there isn't //a replyQuorum of matching replies Logger.println("Received n-f replies and no response could be extracted."); canSendLock.unlock(); if (reqType == ReconfigurationManager.TOM_READONLY_REQUEST) { //invoke the operation again, whitout the read-only flag Logger.println("###################RETRY#######################"); return invoke(request); } else { throw new RuntimeException("Received n-f replies without f+1 of them matching."); } } else { //normal operation //******* EDUARDO BEGIN **************// if (reqType == ReconfigurationManager.TOM_NORMAL_REQUEST) { //Reply to a normal request! if (response.getViewID() == getViewManager().getCurrentViewId()) { ret = response.getContent(); // return the response } else {//if(response.getViewID() > getViewManager().getCurrentViewId()) //updated view received reconfigureTo((View) TOMUtil.getObject(response.getContent())); canSendLock.unlock(); return invoke(request, reqType); } } else { //Reply to a reconfigure request! Logger.println("Reconfiguration request' reply received!"); //It is impossible to be less than... if (response.getViewID() > getViewManager().getCurrentViewId()) { Object r = TOMUtil.getObject(response.getContent()); if (r instanceof View) { //did not executed the request because it is using an outdated view reconfigureTo((View) r); canSendLock.unlock(); return invoke(request, reqType); } else { //reconfiguration executed! reconfigureTo(((ReconfigureReply) r).getView()); ret = response.getContent(); } } else { //Caso a reconfiguração nao foi executada porque algum parametro // da requisição estava incorreto: o processo queria fazer algo que nao é permitido ret = response.getContent(); } } } //******* EDUARDO END **************// canSendLock.unlock(); return ret; } //******* EDUARDO BEGIN **************// private void reconfigureTo(View v) { Logger.println("Installing a most up-to-date view with id=" + v.getId()); getViewManager().reconfigureTo(v); replies = new TOMMessage[getViewManager().getCurrentViewN()]; getCommunicationSystem().updateConnections(); } //******* EDUARDO END **************// /** * This is the method invoked by the client side comunication system. * * @param reply The reply delivered by the client side comunication system */ @Override public void replyReceived(TOMMessage reply) { canReceiveLock.lock(); if (reqId == -1) {//no message being expected Logger.println("throwing out request: sender=" + reply.getSender() + " reqId=" + reply.getSequence()); canReceiveLock.unlock(); return; } //******* EDUARDO BEGIN **************// int pos = getViewManager().getCurrentViewPos(reply.getSender()); if (pos < 0) { //ignore messages that don't come from replicas canReceiveLock.unlock(); return; } //******* EDUARDO END **************// /* if (reply.getSequence() > reqId) { // Is this a reply for the last request sent? Logger.println("Storing reply from "+reply.getSender()+" with reqId:"+reply.getSequence()); aheadOfTimeReplies.add(reply); } else */ if (reply.getSequence() == reqId) { Logger.println("Receiving reply from " + reply.getSender() + " with reqId:" + reply.getSequence() + ". Putting on pos=" + pos); /* if(receivedReplies == 0) { //If this is the first reply received for reqId, lets look at ahead //of time messages to process possible messages for this reqId that //were already received for(ListIterator li = aheadOfTimeReplies.listIterator(); li.hasNext(); ) { TOMMessage rr = li.next(); if(rr.getSequence() == reqId) { int rpos = getViewManager().getCurrentViewPos(rr.getSender()); if(replies[rpos] == null) receivedReplies++; replies[rpos] = rr; li.remove(); } } } */ if (replies[pos] == null) { receivedReplies++; } replies[pos] = reply; // Compare the reply just received, to the others int sameContent = 1; for (int i = 0; i < replies.length; i++) { if (i != pos && replies[i] != null && (comparator.compare(replies[i].getContent(), reply.getContent()) == 0)) { sameContent++; if (sameContent >= replyQuorum) { response = extractor.extractResponse(replies, sameContent, pos); reqId = -1; this.sm.release(); // resumes the thread that is executing the "invoke" method break; } } } if (response == null && receivedReplies >= getViewManager().getCurrentViewN() - getViewManager().getCurrentViewF()) { //it's not safe to wait for more replies (n-f replies received), //but there is no response available... reqId = -1; this.sm.release(); // resumes the thread that is executing the "invoke" method } } // Critical section ends here. The semaphore can be released canReceiveLock.unlock(); } }