From 11abfba6789952257df91779590865d886459e93 Mon Sep 17 00:00:00 2001 From: andrew Date: Tue, 3 Sep 2019 05:19:25 +0100 Subject: [PATCH] 8139965: Hang seen when using com.sun.jndi.ldap.search.replyQueueSize Reviewed-by: dfuchs --- .../classes/com/sun/jndi/ldap/BerDecoder.java | 12 ++- .../classes/com/sun/jndi/ldap/Connection.java | 67 +++----------- .../com/sun/jndi/ldap/LdapRequest.java | 88 ++++++++++--------- 3 files changed, 67 insertions(+), 100 deletions(-) diff --git a/src/share/classes/com/sun/jndi/ldap/BerDecoder.java b/src/share/classes/com/sun/jndi/ldap/BerDecoder.java index 9feefb47f..58a64154f 100644 --- a/src/share/classes/com/sun/jndi/ldap/BerDecoder.java +++ b/src/share/classes/com/sun/jndi/ldap/BerDecoder.java @@ -186,12 +186,16 @@ public final class BerDecoder extends Ber { * */ private int parseIntWithTag(int tag) throws DecodeException { - - if (parseByte() != tag) { + // Ber could have been reset; + String s; + if (offset > 0) { + s = Integer.toString(buf[offset - 1] & 0xff); + } else { + s = "Empty tag"; + } throw new DecodeException("Encountered ASN.1 tag " + - Integer.toString(buf[offset - 1] & 0xff) + - " (expected tag " + Integer.toString(tag) + ")"); + s + " (expected tag " + Integer.toString(tag) + ")"); } int len = parseLength(); diff --git a/src/share/classes/com/sun/jndi/ldap/Connection.java b/src/share/classes/com/sun/jndi/ldap/Connection.java index 0c059f5bf..43e83c12b 100644 --- a/src/share/classes/com/sun/jndi/ldap/Connection.java +++ b/src/share/classes/com/sun/jndi/ldap/Connection.java @@ -453,65 +453,29 @@ public final class Connection implements Runnable { /** * Reads a reply; waits until one is ready. */ - BerDecoder readReply(LdapRequest ldr) - throws IOException, NamingException { + BerDecoder readReply(LdapRequest ldr) throws IOException, NamingException { BerDecoder rber; - // Track down elapsed time to workaround spurious wakeups - long elapsedMilli = 0; - long elapsedNano = 0; - - while (((rber = ldr.getReplyBer()) == null) && - (readTimeout <= 0 || elapsedMilli < readTimeout)) - { - try { - // If socket closed, don't even try - synchronized (this) { - if (sock == null) { - throw new ServiceUnavailableException(host + ":" + port + - "; socket closed"); - } - } - synchronized (ldr) { - // check if condition has changed since our last check - rber = ldr.getReplyBer(); - if (rber == null) { - if (readTimeout > 0) { // Socket read timeout is specified - long beginNano = System.nanoTime(); - - // will be woken up before readTimeout if reply is - // available - ldr.wait(readTimeout - elapsedMilli); - elapsedNano += (System.nanoTime() - beginNano); - elapsedMilli += elapsedNano / 1000_000; - elapsedNano %= 1000_000; - - } else { - // no timeout is set so we wait infinitely until - // a response is received - // https://docs.oracle.com/javase/8/docs/technotes/guides/jndi/jndi-ldap.html#PROP - ldr.wait(); - } - } else { - break; - } - } - } catch (InterruptedException ex) { - throw new InterruptedNamingException( - "Interrupted during LDAP operation"); - } + try { + // if no timeout is set so we wait infinitely until + // a response is received + // http://docs.oracle.com/javase/8/docs/technotes/guides/jndi/jndi-ldap.html#PROP + rber = ldr.getReplyBer(readTimeout); + } catch (InterruptedException ex) { + throw new InterruptedNamingException( + "Interrupted during LDAP operation"); } - if ((rber == null) && (elapsedMilli >= readTimeout)) { + if (rber == null) { abandonRequest(ldr, null); - throw new NamingException("LDAP response read timed out, timeout used:" + throw new NamingException( + "LDAP response read timed out, timeout used:" + readTimeout + "ms." ); } return rber; } - //////////////////////////////////////////////////////////////////////////// // // Methods to add, find, delete, and abandon requests made to server @@ -705,14 +669,11 @@ public final class Connection implements Runnable { if (nparent) { LdapRequest ldr = pendingRequests; while (ldr != null) { - - synchronized (ldr) { - ldr.notify(); + ldr.close(); ldr = ldr.next; } } } - } if (nparent) { parent.processConnectionClosure(); } @@ -800,7 +761,7 @@ public final class Connection implements Runnable { * the safest thing to do is to shut it down. */ - private Object pauseLock = new Object(); // lock for reader to wait on while paused + private final Object pauseLock = new Object(); // lock for reader to wait on while paused private boolean paused = false; // paused state of reader /* diff --git a/src/share/classes/com/sun/jndi/ldap/LdapRequest.java b/src/share/classes/com/sun/jndi/ldap/LdapRequest.java index 23347ac5d..ddea9d40b 100644 --- a/src/share/classes/com/sun/jndi/ldap/LdapRequest.java +++ b/src/share/classes/com/sun/jndi/ldap/LdapRequest.java @@ -29,55 +29,52 @@ import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.naming.CommunicationException; +import java.util.concurrent.TimeUnit; final class LdapRequest { - LdapRequest next; // Set/read in synchronized Connection methods - int msgId; // read-only + private final static BerDecoder EOF = new BerDecoder(new byte[]{}, -1, 0); - private int gotten = 0; - private BlockingQueue replies; - private int highWatermark = -1; - private boolean cancelled = false; - private boolean pauseAfterReceipt = false; - private boolean completed = false; + LdapRequest next; // Set/read in synchronized Connection methods + final int msgId; // read-only - LdapRequest(int msgId, boolean pause) { - this(msgId, pause, -1); - } + private final BlockingQueue replies; + private volatile boolean cancelled; + private volatile boolean closed; + private volatile boolean completed; + private final boolean pauseAfterReceipt; LdapRequest(int msgId, boolean pause, int replyQueueCapacity) { this.msgId = msgId; this.pauseAfterReceipt = pause; if (replyQueueCapacity == -1) { - this.replies = new LinkedBlockingQueue(); + this.replies = new LinkedBlockingQueue<>(); } else { - this.replies = - new LinkedBlockingQueue(replyQueueCapacity); - highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity + this.replies = new LinkedBlockingQueue<>(8 * replyQueueCapacity / 10); } } - synchronized void cancel() { + void cancel() { cancelled = true; + replies.offer(EOF); + } - // Unblock reader of pending request - // Should only ever have atmost one waiter - notify(); + synchronized void close() { + closed = true; + replies.offer(EOF); + } + + private boolean isClosed() { + return closed && (replies.size() == 0 || replies.peek() == EOF); } synchronized boolean addReplyBer(BerDecoder ber) { - if (cancelled) { + // check the closed boolean value here as we don't want anything + // to be added to the queue after close() has been called. + if (cancelled || closed) { return false; } - // Add a new reply to the queue of unprocessed replies. - try { - replies.put(ber); - } catch (InterruptedException e) { - // ignore - } - // peek at the BER buffer to check if it is a SearchResultDone PDU try { ber.parseSeq(null); @@ -88,33 +85,38 @@ final class LdapRequest { } ber.reset(); - notify(); // notify anyone waiting for reply - /* - * If a queue capacity has been set then trigger a pause when the - * queue has filled to 80% capacity. Later, when the queue has drained - * then the reader gets unpaused. - */ - if (highWatermark != -1 && replies.size() >= highWatermark) { - return true; // trigger the pause + // Add a new reply to the queue of unprocessed replies. + try { + replies.put(ber); + } catch (InterruptedException e) { + // ignore } + return pauseAfterReceipt; } - synchronized BerDecoder getReplyBer() throws CommunicationException { + BerDecoder getReplyBer(long millis) throws CommunicationException, + InterruptedException { + if (cancelled) { + throw new CommunicationException("Request: " + msgId + + " cancelled"); + } + if (isClosed()) { + return null; + } + + BerDecoder result = millis > 0 ? + replies.poll(millis, TimeUnit.MILLISECONDS) : replies.take(); + if (cancelled) { throw new CommunicationException("Request: " + msgId + " cancelled"); } - /* - * Remove a reply if the queue is not empty. - * poll returns null if queue is empty. - */ - BerDecoder reply = replies.poll(); - return reply; + return result == EOF ? null : result; } - synchronized boolean hasSearchCompleted() { + boolean hasSearchCompleted() { return completed; } } -- GitLab