提交 300885d7 编写于 作者: B bessani@gmail.com

Corrected small bugs related with number of messages needed for non-read-only...

Corrected small bugs related with number of messages needed for non-read-only request (it is 2f+1 instaead of f+1) and also added a invokeTimeout property that defines the amount of time a client wait for replies from an invoke before return null.
上级 0417af6a
...@@ -22,6 +22,7 @@ import java.util.Comparator; ...@@ -22,6 +22,7 @@ import java.util.Comparator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import navigators.smart.reconfiguration.ReconfigurationManager; import navigators.smart.reconfiguration.ReconfigurationManager;
...@@ -44,7 +45,6 @@ public class ServiceProxy extends TOMSender { ...@@ -44,7 +45,6 @@ public class ServiceProxy extends TOMSender {
private ReentrantLock canReceiveLock = new ReentrantLock(); private ReentrantLock canReceiveLock = new ReentrantLock();
private ReentrantLock canSendLock = new ReentrantLock(); private ReentrantLock canSendLock = new ReentrantLock();
//******* EDUARDO END **************//
private Semaphore sm = new Semaphore(0); private Semaphore sm = new Semaphore(0);
private int reqId = -1; // request id private int reqId = -1; // request id
...@@ -53,9 +53,13 @@ public class ServiceProxy extends TOMSender { ...@@ -53,9 +53,13 @@ public class ServiceProxy extends TOMSender {
private int receivedReplies = 0; // Number of received replies private int receivedReplies = 0; // Number of received replies
private TOMMessage response = null; // Reply delivered to the application private TOMMessage response = null; // Reply delivered to the application
private LinkedList<TOMMessage> aheadOfTimeReplies = new LinkedList<TOMMessage>(); private LinkedList<TOMMessage> aheadOfTimeReplies = new LinkedList<TOMMessage>();
private int invokeTimeout = 60;
private Comparator comparator; private Comparator comparator;
private Extractor extractor; private Extractor extractor;
/** /**
* Constructor * Constructor
* *
...@@ -109,6 +113,27 @@ public class ServiceProxy extends TOMSender { ...@@ -109,6 +113,27 @@ public class ServiceProxy extends TOMSender {
}; };
} }
/**
* Get the amount of time (in seconds) that this proxy will wait for
* servers replies before returning null.
*
* @return the invokeTimeout
*/
public int getInvokeTimeout() {
return invokeTimeout;
}
/**
* Set the amount of time (in seconds) that this proxy will wait for
* servers replies before returning null.
*
* @param invokeTimeout the invokeTimeout to set
*/
public void setInvokeTimeout(int invokeTimeout) {
this.invokeTimeout = invokeTimeout;
}
/** /**
* This method sends a request to the replicas, and returns the related reply. This method is * This method sends a request to the replicas, and returns the related reply. This method is
* thread-safe. * thread-safe.
...@@ -126,6 +151,7 @@ public class ServiceProxy extends TOMSender { ...@@ -126,6 +151,7 @@ public class ServiceProxy extends TOMSender {
/** /**
* This method sends a request to the replicas, and returns the related reply. * This method sends a request to the replicas, and returns the related reply.
* If the servers take more than invokeTimeout seconds the method returns null.
* This method is thread-safe. * This method is thread-safe.
* *
* @param request Request to be sent * @param request Request to be sent
...@@ -141,12 +167,9 @@ public class ServiceProxy extends TOMSender { ...@@ -141,12 +167,9 @@ public class ServiceProxy extends TOMSender {
Arrays.fill(replies, null); Arrays.fill(replies, null);
receivedReplies = 0; receivedReplies = 0;
response = null; response = null;
//if n=3f+1, read-only requests wait for 2f+1 matching replies while normal
//requests wait for only f+1 replyQuorum = (int) Math.ceil((getViewManager().getCurrentViewN() +
replyQuorum = readOnly? getViewManager().getCurrentViewF()) / 2) + 1;
((int) Math.ceil((getViewManager().getCurrentViewN() +
getViewManager().getCurrentViewF()) / 2) + 1):
(getViewManager().getCurrentViewF() + 1);
// Send the request to the replicas, and get its ID // Send the request to the replicas, and get its ID
reqId = generateRequestId(); reqId = generateRequestId();
...@@ -159,7 +182,12 @@ public class ServiceProxy extends TOMSender { ...@@ -159,7 +182,12 @@ public class ServiceProxy extends TOMSender {
// The thread will be unblocked when the method replyReceived is invoked // The thread will be unblocked when the method replyReceived is invoked
// by the client side communication system // by the client side communication system
try { try {
this.sm.acquire(); if(!this.sm.tryAcquire(invokeTimeout, TimeUnit.SECONDS)) {
Logger.println("###################TIMEOUT#######################");
Logger.println("Reply timeout for reqId="+reqId);
canSendLock.unlock();
return null;
}
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
} }
...@@ -175,9 +203,7 @@ public class ServiceProxy extends TOMSender { ...@@ -175,9 +203,7 @@ public class ServiceProxy extends TOMSender {
canSendLock.unlock(); canSendLock.unlock();
if(readOnly) { if(readOnly) {
//invoke the operation again, whitout the read-only flag //invoke the operation again, whitout the read-only flag
Logger.println("###############################################");
Logger.println("###################RETRY#######################"); Logger.println("###################RETRY#######################");
Logger.println("###############################################");
return invoke(request, reqType, false); return invoke(request, reqType, false);
} else { } else {
throw new RuntimeException("Received n-f replies without f+1 of them matching."); throw new RuntimeException("Received n-f replies without f+1 of them matching.");
...@@ -269,14 +295,14 @@ public class ServiceProxy extends TOMSender { ...@@ -269,14 +295,14 @@ public class ServiceProxy extends TOMSender {
TOMMessage rr = li.next(); TOMMessage rr = li.next();
if(rr.getSequence() == reqId) { if(rr.getSequence() == reqId) {
int rpos = getViewManager().getCurrentViewPos(rr.getSender()); int rpos = getViewManager().getCurrentViewPos(rr.getSender());
receivedReplies++; if(replies[rpos] == null) receivedReplies++;
replies[rpos] = rr; replies[rpos] = rr;
li.remove(); li.remove();
} }
} }
} }
receivedReplies++; if(replies[pos] == null) receivedReplies++;
replies[pos] = reply; replies[pos] = reply;
// Compare the reply just received, to the others // Compare the reply just received, to the others
...@@ -294,8 +320,8 @@ public class ServiceProxy extends TOMSender { ...@@ -294,8 +320,8 @@ public class ServiceProxy extends TOMSender {
} }
} }
if (response == null && receivedReplies >= /*n-f*/ if (response == null &&
getViewManager().getCurrentViewN() - getViewManager().getCurrentViewF()) { receivedReplies >= getViewManager().getCurrentViewN() - getViewManager().getCurrentViewF()) {
//it's not safe to wait for more replies (n-f replies received), //it's not safe to wait for more replies (n-f replies received),
//but there is no response available... //but there is no response available...
reqId = -1; reqId = -1;
...@@ -306,4 +332,5 @@ public class ServiceProxy extends TOMSender { ...@@ -306,4 +332,5 @@ public class ServiceProxy extends TOMSender {
// Critical section ends here. The semaphore can be released // Critical section ends here. The semaphore can be released
canReceiveLock.unlock(); canReceiveLock.unlock();
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册