提交 11abfba6 编写于 作者: A andrew

8139965: Hang seen when using com.sun.jndi.ldap.search.replyQueueSize

Reviewed-by: dfuchs
上级 4485f040
...@@ -186,12 +186,16 @@ public final class BerDecoder extends Ber { ...@@ -186,12 +186,16 @@ public final class BerDecoder extends Ber {
*</pre></blockquote> *</pre></blockquote>
*/ */
private int parseIntWithTag(int tag) throws DecodeException { private int parseIntWithTag(int tag) throws DecodeException {
if (parseByte() != tag) { if (parseByte() != tag) {
// Ber could have been reset;
String s;
if (offset > 0) {
s = Integer.toString(buf[offset - 1] & 0xff);
} else {
s = "Empty tag";
}
throw new DecodeException("Encountered ASN.1 tag " + throw new DecodeException("Encountered ASN.1 tag " +
Integer.toString(buf[offset - 1] & 0xff) + s + " (expected tag " + Integer.toString(tag) + ")");
" (expected tag " + Integer.toString(tag) + ")");
} }
int len = parseLength(); int len = parseLength();
......
...@@ -453,65 +453,29 @@ public final class Connection implements Runnable { ...@@ -453,65 +453,29 @@ public final class Connection implements Runnable {
/** /**
* Reads a reply; waits until one is ready. * Reads a reply; waits until one is ready.
*/ */
BerDecoder readReply(LdapRequest ldr) BerDecoder readReply(LdapRequest ldr) throws IOException, NamingException {
throws IOException, NamingException {
BerDecoder rber; BerDecoder rber;
// Track down elapsed time to workaround spurious wakeups
long elapsedMilli = 0;
long elapsedNano = 0;
while (((rber = ldr.getReplyBer()) == null) &&
(readTimeout <= 0 || elapsedMilli < readTimeout))
{
try { try {
// If socket closed, don't even try // if no timeout is set so we wait infinitely until
synchronized (this) {
if (sock == null) {
throw new ServiceUnavailableException(host + ":" + port +
"; socket closed");
}
}
synchronized (ldr) {
// check if condition has changed since our last check
rber = ldr.getReplyBer();
if (rber == null) {
if (readTimeout > 0) { // Socket read timeout is specified
long beginNano = System.nanoTime();
// will be woken up before readTimeout if reply is
// available
ldr.wait(readTimeout - elapsedMilli);
elapsedNano += (System.nanoTime() - beginNano);
elapsedMilli += elapsedNano / 1000_000;
elapsedNano %= 1000_000;
} else {
// no timeout is set so we wait infinitely until
// a response is received // a response is received
// https://docs.oracle.com/javase/8/docs/technotes/guides/jndi/jndi-ldap.html#PROP // http://docs.oracle.com/javase/8/docs/technotes/guides/jndi/jndi-ldap.html#PROP
ldr.wait(); rber = ldr.getReplyBer(readTimeout);
}
} else {
break;
}
}
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
throw new InterruptedNamingException( throw new InterruptedNamingException(
"Interrupted during LDAP operation"); "Interrupted during LDAP operation");
} }
}
if ((rber == null) && (elapsedMilli >= readTimeout)) { if (rber == null) {
abandonRequest(ldr, null); abandonRequest(ldr, null);
throw new NamingException("LDAP response read timed out, timeout used:" throw new NamingException(
"LDAP response read timed out, timeout used:"
+ readTimeout + "ms." ); + readTimeout + "ms." );
} }
return rber; return rber;
} }
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
// //
// Methods to add, find, delete, and abandon requests made to server // Methods to add, find, delete, and abandon requests made to server
...@@ -705,14 +669,11 @@ public final class Connection implements Runnable { ...@@ -705,14 +669,11 @@ public final class Connection implements Runnable {
if (nparent) { if (nparent) {
LdapRequest ldr = pendingRequests; LdapRequest ldr = pendingRequests;
while (ldr != null) { while (ldr != null) {
ldr.close();
synchronized (ldr) {
ldr.notify();
ldr = ldr.next; ldr = ldr.next;
} }
} }
} }
}
if (nparent) { if (nparent) {
parent.processConnectionClosure(); parent.processConnectionClosure();
} }
...@@ -800,7 +761,7 @@ public final class Connection implements Runnable { ...@@ -800,7 +761,7 @@ public final class Connection implements Runnable {
* the safest thing to do is to shut it down. * the safest thing to do is to shut it down.
*/ */
private Object pauseLock = new Object(); // lock for reader to wait on while paused private final Object pauseLock = new Object(); // lock for reader to wait on while paused
private boolean paused = false; // paused state of reader private boolean paused = false; // paused state of reader
/* /*
......
...@@ -29,53 +29,50 @@ import java.io.IOException; ...@@ -29,53 +29,50 @@ import java.io.IOException;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import javax.naming.CommunicationException; import javax.naming.CommunicationException;
import java.util.concurrent.TimeUnit;
final class LdapRequest { final class LdapRequest {
LdapRequest next; // Set/read in synchronized Connection methods private final static BerDecoder EOF = new BerDecoder(new byte[]{}, -1, 0);
int msgId; // read-only
private int gotten = 0; LdapRequest next; // Set/read in synchronized Connection methods
private BlockingQueue<BerDecoder> replies; final int msgId; // read-only
private int highWatermark = -1;
private boolean cancelled = false;
private boolean pauseAfterReceipt = false;
private boolean completed = false;
LdapRequest(int msgId, boolean pause) { private final BlockingQueue<BerDecoder> replies;
this(msgId, pause, -1); private volatile boolean cancelled;
} private volatile boolean closed;
private volatile boolean completed;
private final boolean pauseAfterReceipt;
LdapRequest(int msgId, boolean pause, int replyQueueCapacity) { LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
this.msgId = msgId; this.msgId = msgId;
this.pauseAfterReceipt = pause; this.pauseAfterReceipt = pause;
if (replyQueueCapacity == -1) { if (replyQueueCapacity == -1) {
this.replies = new LinkedBlockingQueue<BerDecoder>(); this.replies = new LinkedBlockingQueue<>();
} else { } else {
this.replies = this.replies = new LinkedBlockingQueue<>(8 * replyQueueCapacity / 10);
new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
} }
} }
synchronized void cancel() { void cancel() {
cancelled = true; cancelled = true;
replies.offer(EOF);
}
// Unblock reader of pending request synchronized void close() {
// Should only ever have atmost one waiter closed = true;
notify(); replies.offer(EOF);
} }
synchronized boolean addReplyBer(BerDecoder ber) { private boolean isClosed() {
if (cancelled) { return closed && (replies.size() == 0 || replies.peek() == EOF);
return false;
} }
// Add a new reply to the queue of unprocessed replies. synchronized boolean addReplyBer(BerDecoder ber) {
try { // check the closed boolean value here as we don't want anything
replies.put(ber); // to be added to the queue after close() has been called.
} catch (InterruptedException e) { if (cancelled || closed) {
// ignore return false;
} }
// peek at the BER buffer to check if it is a SearchResultDone PDU // peek at the BER buffer to check if it is a SearchResultDone PDU
...@@ -88,33 +85,38 @@ final class LdapRequest { ...@@ -88,33 +85,38 @@ final class LdapRequest {
} }
ber.reset(); ber.reset();
notify(); // notify anyone waiting for reply // Add a new reply to the queue of unprocessed replies.
/* try {
* If a queue capacity has been set then trigger a pause when the replies.put(ber);
* queue has filled to 80% capacity. Later, when the queue has drained } catch (InterruptedException e) {
* then the reader gets unpaused. // ignore
*/
if (highWatermark != -1 && replies.size() >= highWatermark) {
return true; // trigger the pause
} }
return pauseAfterReceipt; return pauseAfterReceipt;
} }
synchronized BerDecoder getReplyBer() throws CommunicationException { BerDecoder getReplyBer(long millis) throws CommunicationException,
InterruptedException {
if (cancelled) {
throw new CommunicationException("Request: " + msgId +
" cancelled");
}
if (isClosed()) {
return null;
}
BerDecoder result = millis > 0 ?
replies.poll(millis, TimeUnit.MILLISECONDS) : replies.take();
if (cancelled) { if (cancelled) {
throw new CommunicationException("Request: " + msgId + throw new CommunicationException("Request: " + msgId +
" cancelled"); " cancelled");
} }
/* return result == EOF ? null : result;
* Remove a reply if the queue is not empty.
* poll returns null if queue is empty.
*/
BerDecoder reply = replies.poll();
return reply;
} }
synchronized boolean hasSearchCompleted() { boolean hasSearchCompleted() {
return completed; return completed;
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册