Connection.java 37.3 KB
Newer Older
D
duke 已提交
1
/*
2
 * Copyright (c) 1999, 2013, Oracle and/or its affiliates. All rights reserved.
D
duke 已提交
3 4 5 6
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
7
 * published by the Free Software Foundation.  Oracle designates this
D
duke 已提交
8
 * particular file as subject to the "Classpath" exception as provided
9
 * by Oracle in the LICENSE file that accompanied this code.
D
duke 已提交
10 11 12 13 14 15 16 17 18 19 20
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
21 22 23
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
D
duke 已提交
24 25 26 27 28 29 30 31 32 33 34
 */

package com.sun.jndi.ldap;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InterruptedIOException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import java.net.Socket;
35
import javax.net.ssl.SSLSocket;
D
duke 已提交
36 37 38 39 40 41 42 43 44 45 46

import javax.naming.CommunicationException;
import javax.naming.ServiceUnavailableException;
import javax.naming.NamingException;
import javax.naming.InterruptedNamingException;

import javax.naming.ldap.Control;

import java.lang.reflect.Method;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
47 48
import java.util.Arrays;
import sun.misc.IOUtils;
D
duke 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
//import javax.net.SocketFactory;

/**
  * A thread that creates a connection to an LDAP server.
  * After the connection, the thread reads from the connection.
  * A caller can invoke methods on the instance to read LDAP responses
  * and to send LDAP requests.
  * <p>
  * There is a one-to-one correspondence between an LdapClient and
  * a Connection. Access to Connection and its methods is only via
  * LdapClient with two exceptions: SASL authentication and StartTLS.
  * SASL needs to access Connection's socket IO streams (in order to do encryption
  * of the security layer). StartTLS needs to do replace IO streams
  * and close the IO  streams on nonfatal close. The code for SASL
  * authentication can be treated as being the same as from LdapClient
  * because the SASL code is only ever called from LdapClient, from
  * inside LdapClient's synchronized authenticate() method. StartTLS is called
  * directly by the application but should only occur when the underlying
  * connection is quiet.
  * <p>
  * In terms of synchronization, worry about data structures
  * used by the Connection thread because that usage might contend
  * with calls by the main threads (i.e., those that call LdapClient).
  * Main threads need to worry about contention with each other.
  * Fields that Connection thread uses:
  *     inStream - synced access and update; initialized in constructor;
  *           referenced outside class unsync'ed (by LdapSasl) only
  *           when connection is quiet
  *     traceFile, traceTagIn, traceTagOut - no sync; debugging only
  *     parent - no sync; initialized in constructor; no updates
  *     pendingRequests - sync
  *     pauseLock - per-instance lock;
  *     paused - sync via pauseLock (pauseReader())
  * Members used by main threads (LdapClient):
  *     host, port - unsync; read-only access for StartTLS and debug messages
  *     setBound(), setV3() - no sync; called only by LdapClient.authenticate(),
  *             which is a sync method called only when connection is "quiet"
  *     getMsgId() - sync
  *     writeRequest(), removeRequest(),findRequest(), abandonOutstandingReqs() -
  *             access to shared pendingRequests is sync
  *     writeRequest(),  abandonRequest(), ldapUnbind() - access to outStream sync
  *     cleanup() - sync
  *     readReply() - access to sock sync
  *     unpauseReader() - (indirectly via writeRequest) sync on pauseLock
  * Members used by SASL auth (main thread):
  *     inStream, outStream - no sync; used to construct new stream; accessed
  *             only when conn is "quiet" and not shared
  *     replaceStreams() - sync method
  * Members used by StartTLS:
  *     inStream, outStream - no sync; used to record the existing streams;
  *             accessed only when conn is "quiet" and not shared
  *     replaceStreams() - sync method
  * <p>
  * Handles anonymous, simple, and SASL bind for v3; anonymous and simple
  * for v2.
  * %%% made public for access by LdapSasl %%%
  *
  * @author Vincent Ryan
  * @author Rosanna Lee
  * @author Jagane Sundar
  */
public final class Connection implements Runnable {

    private static final boolean debug = false;
    private static final int dump = 0; // > 0 r, > 1 rw


    final private Thread worker;    // Initialized in constructor

    private boolean v3 = true;       // Set in setV3()

    final public String host;  // used by LdapClient for generating exception messages
                         // used by StartTlsResponse when creating an SSL socket
    final public int port;     // used by LdapClient for generating exception messages
                         // used by StartTlsResponse when creating an SSL socket

    private boolean bound = false;   // Set in setBound()

    // All three are initialized in constructor and read-only afterwards
    private OutputStream traceFile = null;
    private String traceTagIn = null;
    private String traceTagOut = null;

    // Initialized in constructor; read and used externally (LdapSasl);
    // Updated in replaceStreams() during "quiet", unshared, period
    public InputStream inStream;   // must be public; used by LdapSasl

    // Initialized in constructor; read and used externally (LdapSasl);
    // Updated in replaceOutputStream() during "quiet", unshared, period
    public OutputStream outStream; // must be public; used by LdapSasl

    // Initialized in constructor; read and used externally (TLS) to
    // get new IO streams; closed during cleanup
    public Socket sock;            // for TLS

    // For processing "disconnect" unsolicited notification
    // Initialized in constructor
    final private LdapClient parent;

    // Incremented and returned in sync getMsgId()
    private int outMsgId = 0;

    //
    // The list of ldapRequests pending on this binding
    //
    // Accessed only within sync methods
    private LdapRequest pendingRequests = null;

    volatile IOException closureReason = null;
    volatile boolean useable = true;  // is Connection still useable

160 161
    int readTimeout;
    int connectTimeout;
D
duke 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190

    // true means v3; false means v2
    // Called in LdapClient.authenticate() (which is synchronized)
    // when connection is "quiet" and not shared; no need to synchronize
    void setV3(boolean v) {
        v3 = v;
    }

    // A BIND request has been successfully made on this connection
    // When cleaning up, remember to do an UNBIND
    // Called in LdapClient.authenticate() (which is synchronized)
    // when connection is "quiet" and not shared; no need to synchronize
    void setBound() {
        bound = true;
    }

    ////////////////////////////////////////////////////////////////////////////
    //
    // Create an LDAP Binding object and bind to a particular server
    //
    ////////////////////////////////////////////////////////////////////////////

    Connection(LdapClient parent, String host, int port, String socketFactory,
        int connectTimeout, int readTimeout, OutputStream trace) throws NamingException {

        this.host = host;
        this.port = port;
        this.parent = parent;
        this.readTimeout = readTimeout;
191
        this.connectTimeout = connectTimeout;
D
duke 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243

        if (trace != null) {
            traceFile = trace;
            traceTagIn = "<- " + host + ":" + port + "\n\n";
            traceTagOut = "-> " + host + ":" + port + "\n\n";
        }

        //
        // Connect to server
        //
        try {
            sock = createSocket(host, port, socketFactory, connectTimeout);

            if (debug) {
                System.err.println("Connection: opening socket: " + host + "," + port);
            }

            inStream = new BufferedInputStream(sock.getInputStream());
            outStream = new BufferedOutputStream(sock.getOutputStream());

        } catch (InvocationTargetException e) {
            Throwable realException = e.getTargetException();
            // realException.printStackTrace();

            CommunicationException ce =
                new CommunicationException(host + ":" + port);
            ce.setRootCause(realException);
            throw ce;
        } catch (Exception e) {
            // Class.forName() seems to do more error checking
            // and will throw IllegalArgumentException and such.
            // That's why we need to have a catch all here and
            // ignore generic exceptions.
            // Also catches all IO errors generated by socket creation.
            CommunicationException ce =
                new CommunicationException(host + ":" + port);
            ce.setRootCause(e);
            throw ce;
        }

        worker = Obj.helper.createThread(this);
        worker.setDaemon(true);
        worker.start();
    }

    /*
     * Create an InetSocketAddress using the specified hostname and port number.
     */
    private Object createInetSocketAddress(String host, int port)
            throws NoSuchMethodException {

        try {
244
            Class<?> inetSocketAddressClass =
D
duke 已提交
245 246
                Class.forName("java.net.InetSocketAddress");

247 248
            Constructor<?> inetSocketAddressCons =
                inetSocketAddressClass.getConstructor(new Class<?>[]{
D
duke 已提交
249 250 251 252 253
                String.class, int.class});

            return inetSocketAddressCons.newInstance(new Object[]{
                host, new Integer(port)});

254 255 256 257
        } catch (ClassNotFoundException |
                 InstantiationException |
                 InvocationTargetException |
                 IllegalAccessException e) {
D
duke 已提交
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
            throw new NoSuchMethodException();

        }
    }

    /*
     * Create a Socket object using the specified socket factory and time limit.
     *
     * If a timeout is supplied and unconnected sockets are supported then
     * an unconnected socket is created and the timeout is applied when
     * connecting the socket. If a timeout is supplied but unconnected sockets
     * are not supported then the timeout is ignored and a connected socket
     * is created.
     */
    private Socket createSocket(String host, int port, String socketFactory,
            int connectTimeout) throws Exception {

        Socket socket = null;

        if (socketFactory != null) {

            // create the factory

281
            Class<?> socketFactoryClass = Obj.helper.loadClass(socketFactory);
D
duke 已提交
282
            Method getDefault =
283
                socketFactoryClass.getMethod("getDefault", new Class<?>[]{});
D
duke 已提交
284 285 286 287 288 289 290 291 292 293
            Object factory = getDefault.invoke(null, new Object[]{});

            // create the socket

            Method createSocket = null;

            if (connectTimeout > 0) {

                try {
                    createSocket = socketFactoryClass.getMethod("createSocket",
294
                        new Class<?>[]{});
D
duke 已提交
295 296

                    Method connect = Socket.class.getMethod("connect",
297
                        new Class<?>[]{Class.forName("java.net.SocketAddress"),
D
duke 已提交
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
                        int.class});
                    Object endpoint = createInetSocketAddress(host, port);

                    // unconnected socket
                    socket =
                        (Socket)createSocket.invoke(factory, new Object[]{});

                    if (debug) {
                        System.err.println("Connection: creating socket with " +
                            "a timeout using supplied socket factory");
                    }

                    // connected socket
                    connect.invoke(socket, new Object[]{
                        endpoint, new Integer(connectTimeout)});

                } catch (NoSuchMethodException e) {
                    // continue (but ignore connectTimeout)
                }
            }

            if (socket == null) {
                createSocket = socketFactoryClass.getMethod("createSocket",
321
                    new Class<?>[]{String.class, int.class});
D
duke 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334 335

                if (debug) {
                    System.err.println("Connection: creating socket using " +
                        "supplied socket factory");
                }
                // connected socket
                socket = (Socket) createSocket.invoke(factory,
                    new Object[]{host, new Integer(port)});
            }
        } else {

            if (connectTimeout > 0) {

                try {
336 337
                    Constructor<Socket> socketCons =
                        Socket.class.getConstructor(new Class<?>[]{});
D
duke 已提交
338 339

                    Method connect = Socket.class.getMethod("connect",
340
                        new Class<?>[]{Class.forName("java.net.SocketAddress"),
D
duke 已提交
341 342 343
                        int.class});
                    Object endpoint = createInetSocketAddress(host, port);

344
                    socket = socketCons.newInstance(new Object[]{});
D
duke 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366

                    if (debug) {
                        System.err.println("Connection: creating socket with " +
                            "a timeout");
                    }
                    connect.invoke(socket, new Object[]{
                        endpoint, new Integer(connectTimeout)});

                } catch (NoSuchMethodException e) {
                    // continue (but ignore connectTimeout)
                }
            }

            if (socket == null) {
                if (debug) {
                    System.err.println("Connection: creating socket");
                }
                // connected socket
                socket = new Socket(host, port);
            }
        }

367 368 369 370 371 372 373 374 375 376 377 378 379
        // For LDAP connect timeouts on LDAP over SSL connections must treat
        // the SSL handshake following socket connection as part of the timeout.
        // So explicitly set a socket read timeout, trigger the SSL handshake,
        // then reset the timeout.
        if (connectTimeout > 0 && socket instanceof SSLSocket) {
            SSLSocket sslSocket = (SSLSocket) socket;
            int socketTimeout = sslSocket.getSoTimeout();

            sslSocket.setSoTimeout(connectTimeout); // reuse full timeout value
            sslSocket.startHandshake();
            sslSocket.setSoTimeout(socketTimeout);
        }

D
duke 已提交
380 381 382 383 384 385 386 387 388 389 390 391 392 393
        return socket;
    }

    ////////////////////////////////////////////////////////////////////////////
    //
    // Methods to IO to the LDAP server
    //
    ////////////////////////////////////////////////////////////////////////////

    synchronized int getMsgId() {
        return ++outMsgId;
    }

    LdapRequest writeRequest(BerEncoder ber, int msgId) throws IOException {
394
        return writeRequest(ber, msgId, false /* pauseAfterReceipt */, -1);
D
duke 已提交
395 396
    }

397 398 399 400 401 402 403
    LdapRequest writeRequest(BerEncoder ber, int msgId,
        boolean pauseAfterReceipt) throws IOException {
        return writeRequest(ber, msgId, pauseAfterReceipt, -1);
    }

    LdapRequest writeRequest(BerEncoder ber, int msgId,
        boolean pauseAfterReceipt, int replyQueueCapacity) throws IOException {
D
duke 已提交
404

405 406
        LdapRequest req =
            new LdapRequest(msgId, pauseAfterReceipt, replyQueueCapacity);
D
duke 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
        addRequest(req);

        if (traceFile != null) {
            Ber.dumpBER(traceFile, traceTagOut, ber.getBuf(), 0, ber.getDataLen());
        }


        // unpause reader so that it can get response
        // NOTE: Must do this before writing request, otherwise might
        // create a race condition where the writer unblocks its own response
        unpauseReader();

        if (debug) {
            System.err.println("Writing request to: " + outStream);
        }

        try {
            synchronized (this) {
                outStream.write(ber.getBuf(), 0, ber.getDataLen());
                outStream.flush();
            }
        } catch (IOException e) {
            cleanup(null, true);
            throw (closureReason = e); // rethrow
        }

        return req;
    }

    /**
     * Reads a reply; waits until one is ready.
     */
    BerDecoder readReply(LdapRequest ldr)
            throws IOException, NamingException {
        BerDecoder rber;
        boolean waited = false;

        while (((rber = ldr.getReplyBer()) == null) && !waited) {
            try {
                // If socket closed, don't even try
                synchronized (this) {
                    if (sock == null) {
                        throw new ServiceUnavailableException(host + ":" + port +
                            "; socket closed");
                    }
                }
                synchronized (ldr) {
                    // check if condition has changed since our last check
                    rber = ldr.getReplyBer();
                    if (rber == null) {
                        if (readTimeout > 0) {  // Socket read timeout is specified

                            // will be woken up before readTimeout only if reply is
                            // available
                            ldr.wait(readTimeout);
                            waited = true;
                        } else {
                            ldr.wait(15 * 1000); // 15 second timeout
                        }
                    } else {
                        break;
                    }
                }
            } catch (InterruptedException ex) {
                throw new InterruptedNamingException(
                    "Interrupted during LDAP operation");
            }
        }

        if ((rber == null) && waited) {
            removeRequest(ldr);
            throw new NamingException("LDAP response read timed out, timeout used:"
                            + readTimeout + "ms." );

        }
        return rber;
    }


    ////////////////////////////////////////////////////////////////////////////
    //
    // Methods to add, find, delete, and abandon requests made to server
    //
    ////////////////////////////////////////////////////////////////////////////

    private synchronized void addRequest(LdapRequest ldapRequest) {

        LdapRequest ldr = pendingRequests;
        if (ldr == null) {
            pendingRequests = ldapRequest;
            ldapRequest.next = null;
        } else {
            ldapRequest.next = pendingRequests;
            pendingRequests = ldapRequest;
        }
    }

    synchronized LdapRequest findRequest(int msgId) {

        LdapRequest ldr = pendingRequests;
        while (ldr != null) {
            if (ldr.msgId == msgId) {
                return ldr;
            }
            ldr = ldr.next;
        }
        return null;

    }

    synchronized void removeRequest(LdapRequest req) {
        LdapRequest ldr = pendingRequests;
        LdapRequest ldrprev = null;

        while (ldr != null) {
            if (ldr == req) {
                ldr.cancel();

                if (ldrprev != null) {
                    ldrprev.next = ldr.next;
                } else {
                    pendingRequests = ldr.next;
                }
                ldr.next = null;
            }
            ldrprev = ldr;
            ldr = ldr.next;
        }
    }

    void abandonRequest(LdapRequest ldr, Control[] reqCtls) {
        // Remove from queue
        removeRequest(ldr);

        BerEncoder ber = new BerEncoder(256);
        int abandonMsgId = getMsgId();

        //
        // build the abandon request.
        //
        try {
            ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
                ber.encodeInt(abandonMsgId);
                ber.encodeInt(ldr.msgId, LdapClient.LDAP_REQ_ABANDON);

                if (v3) {
                    LdapClient.encodeControls(ber, reqCtls);
                }
            ber.endSeq();

            if (traceFile != null) {
                Ber.dumpBER(traceFile, traceTagOut, ber.getBuf(), 0,
                    ber.getDataLen());
            }

            synchronized (this) {
                outStream.write(ber.getBuf(), 0, ber.getDataLen());
                outStream.flush();
            }

        } catch (IOException ex) {
            //System.err.println("ldap.abandon: " + ex);
        }

571
        // Don't expect any response for the abandon request.
D
duke 已提交
572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
    }

    synchronized void abandonOutstandingReqs(Control[] reqCtls) {
        LdapRequest ldr = pendingRequests;

        while (ldr != null) {
            abandonRequest(ldr, reqCtls);
            pendingRequests = ldr = ldr.next;
        }
    }

    ////////////////////////////////////////////////////////////////////////////
    //
    // Methods to unbind from server and clear up resources when object is
    // destroyed.
    //
    ////////////////////////////////////////////////////////////////////////////

    private void ldapUnbind(Control[] reqCtls) {

        BerEncoder ber = new BerEncoder(256);
        int unbindMsgId = getMsgId();

        //
        // build the unbind request.
        //

        try {

            ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
                ber.encodeInt(unbindMsgId);
                // IMPLICIT TAGS
                ber.encodeByte(LdapClient.LDAP_REQ_UNBIND);
                ber.encodeByte(0);

                if (v3) {
                    LdapClient.encodeControls(ber, reqCtls);
                }
            ber.endSeq();

            if (traceFile != null) {
                Ber.dumpBER(traceFile, traceTagOut, ber.getBuf(),
                    0, ber.getDataLen());
            }

            synchronized (this) {
                outStream.write(ber.getBuf(), 0, ber.getDataLen());
                outStream.flush();
            }

        } catch (IOException ex) {
            //System.err.println("ldap.unbind: " + ex);
        }

626
        // Don't expect any response for the unbind request.
D
duke 已提交
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
    }

    /**
     * @param reqCtls Possibly null request controls that accompanies the
     *    abandon and unbind LDAP request.
     * @param notifyParent true means to call parent LdapClient back, notifying
     *    it that the connection has been closed; false means not to notify
     *    parent. If LdapClient invokes cleanup(), notifyParent should be set to
     *    false because LdapClient already knows that it is closing
     *    the connection. If Connection invokes cleanup(), notifyParent should be
     *    set to true because LdapClient needs to know about the closure.
     */
    void cleanup(Control[] reqCtls, boolean notifyParent) {
        boolean nparent = false;

        synchronized (this) {
            useable = false;

            if (sock != null) {
                if (debug) {
                    System.err.println("Connection: closing socket: " + host + "," + port);
                }
                try {
                    if (!notifyParent) {
                        abandonOutstandingReqs(reqCtls);
                    }
                    if (bound) {
                        ldapUnbind(reqCtls);
                    }
                } finally {
                    try {
                        outStream.flush();
                        sock.close();
                        unpauseReader();
                    } catch (IOException ie) {
                        if (debug)
                            System.err.println("Connection: problem closing socket: " + ie);
                    }
                    if (!notifyParent) {
                        LdapRequest ldr = pendingRequests;
                        while (ldr != null) {
                            ldr.cancel();
                            ldr = ldr.next;
                        }
                    }
                    sock = null;
                }
                nparent = notifyParent;
            }
676 677 678 679 680 681 682 683 684
            if (nparent) {
                LdapRequest ldr = pendingRequests;
                while (ldr != null) {

                    synchronized (ldr) {
                        ldr.notify();
                        ldr = ldr.next;
                    }
                }
685
            }
D
duke 已提交
686
        }
687 688 689
        if (nparent) {
            parent.processConnectionClosure();
        }
D
duke 已提交
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837
    }


    // Assume everything is "quiet"
    // "synchronize" might lead to deadlock so don't synchronize method
    // Use streamLock instead for synchronizing update to stream

    synchronized public void replaceStreams(InputStream newIn, OutputStream newOut) {
        if (debug) {
            System.err.println("Replacing " + inStream + " with: " + newIn);
            System.err.println("Replacing " + outStream + " with: " + newOut);
        }

        inStream = newIn;

        // Cleanup old stream
        try {
            outStream.flush();
        } catch (IOException ie) {
            if (debug)
                System.err.println("Connection: cannot flush outstream: " + ie);
        }

        // Replace stream
        outStream = newOut;
    }

    /**
     * Used by Connection thread to read inStream into a local variable.
     * This ensures that there is no contention between the main thread
     * and the Connection thread when the main thread updates inStream.
     */
    synchronized private InputStream getInputStream() {
        return inStream;
    }


    ////////////////////////////////////////////////////////////////////////////
    //
    // Code for pausing/unpausing the reader thread ('worker')
    //
    ////////////////////////////////////////////////////////////////////////////

    /*
     * The main idea is to mark requests that need the reader thread to
     * pause after getting the response. When the reader thread gets the response,
     * it waits on a lock instead of returning to the read(). The next time a
     * request is sent, the reader is automatically unblocked if necessary.
     * Note that the reader must be unblocked BEFORE the request is sent.
     * Otherwise, there is a race condition where the request is sent and
     * the reader thread might read the response and be unblocked
     * by writeRequest().
     *
     * This pause gives the main thread (StartTLS or SASL) an opportunity to
     * update the reader's state (e.g., its streams) if necessary.
     * The assumption is that the connection will remain quiet during this pause
     * (i.e., no intervening requests being sent).
     *<p>
     * For dealing with StartTLS close,
     * when the read() exits either due to EOF or an exception,
     * the reader thread checks whether there is a new stream to read from.
     * If so, then it reattempts the read. Otherwise, the EOF or exception
     * is processed and the reader thread terminates.
     * In a StartTLS close, the client first replaces the SSL IO streams with
     * plain ones and then closes the SSL socket.
     * If the reader thread attempts to read, or was reading, from
     * the SSL socket (that is, it got to the read BEFORE replaceStreams()),
     * the SSL socket close will cause the reader thread to
     * get an EOF/exception and reexamine the input stream.
     * If the reader thread sees a new stream, it reattempts the read.
     * If the underlying socket is still alive, then the new read will succeed.
     * If the underlying socket has been closed also, then the new read will
     * fail and the reader thread exits.
     * If the reader thread attempts to read, or was reading, from the plain
     * socket (that is, it got to the read AFTER replaceStreams()), the
     * SSL socket close will have no effect on the reader thread.
     *
     * The check for new stream is made only
     * in the first attempt at reading a BER buffer; the reader should
     * never be in midst of reading a buffer when a nonfatal close occurs.
     * If this occurs, then the connection is in an inconsistent state and
     * the safest thing to do is to shut it down.
     */

    private Object pauseLock = new Object();  // lock for reader to wait on while paused
    private boolean paused = false;           // paused state of reader

    /*
     * Unpauses reader thread if it was paused
     */
    private void unpauseReader() throws IOException {
        synchronized (pauseLock) {
            if (paused) {
                if (debug) {
                    System.err.println("Unpausing reader; read from: " +
                                        inStream);
                }
                paused = false;
                pauseLock.notify();
            }
        }
    }

     /*
     * Pauses reader so that it stops reading from the input stream.
     * Reader blocks on pauseLock instead of read().
     * MUST be called from within synchronized (pauseLock) clause.
     */
    private void pauseReader() throws IOException {
        if (debug) {
            System.err.println("Pausing reader;  was reading from: " +
                                inStream);
        }
        paused = true;
        try {
            while (paused) {
                pauseLock.wait(); // notified by unpauseReader
            }
        } catch (InterruptedException e) {
            throw new InterruptedIOException(
                    "Pause/unpause reader has problems.");
        }
    }


    ////////////////////////////////////////////////////////////////////////////
    //
    // The LDAP Binding thread. It does the mux/demux of multiple requests
    // on the same TCP connection.
    //
    ////////////////////////////////////////////////////////////////////////////


    public void run() {
        byte inbuf[];   // Buffer for reading incoming bytes
        int inMsgId;    // Message id of incoming response
        int bytesread;  // Number of bytes in inbuf
        int br;         // Temp; number of bytes read from stream
        int offset;     // Offset of where to store bytes in inbuf
        int seqlen;     // Length of ASN sequence
        int seqlenlen;  // Number of sequence length bytes
        boolean eos;    // End of stream
        BerDecoder retBer;    // Decoder for ASN.1 BER data from inbuf
        InputStream in = null;

        try {
            while (true) {
                try {
838 839
                    // type and length (at most 128 octets for long form)
                    inbuf = new byte[129];
D
duke 已提交
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898

                    offset = 0;
                    seqlen = 0;
                    seqlenlen = 0;

                    in = getInputStream();

                    // check that it is the beginning of a sequence
                    bytesread = in.read(inbuf, offset, 1);
                    if (bytesread < 0) {
                        if (in != getInputStream()) {
                            continue;   // a new stream to try
                        } else {
                            break; // EOF
                        }
                    }

                    if (inbuf[offset++] != (Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR))
                        continue;

                    // get length of sequence
                    bytesread = in.read(inbuf, offset, 1);
                    if (bytesread < 0)
                        break; // EOF
                    seqlen = inbuf[offset++];

                    // if high bit is on, length is encoded in the
                    // subsequent length bytes and the number of length bytes
                    // is equal to & 0x80 (i.e. length byte with high bit off).
                    if ((seqlen & 0x80) == 0x80) {
                        seqlenlen = seqlen & 0x7f;  // number of length bytes

                        bytesread = 0;
                        eos = false;

                        // Read all length bytes
                        while (bytesread < seqlenlen) {
                            br = in.read(inbuf, offset+bytesread,
                                seqlenlen-bytesread);
                            if (br < 0) {
                                eos = true;
                                break; // EOF
                            }
                            bytesread += br;
                        }

                        // end-of-stream reached before length bytes are read
                        if (eos)
                            break;  // EOF

                        // Add contents of length bytes to determine length
                        seqlen = 0;
                        for( int i = 0; i < seqlenlen; i++) {
                            seqlen = (seqlen << 8) + (inbuf[offset+i] & 0xff);
                        }
                        offset += bytesread;
                    }

                    // read in seqlen bytes
899 900 901 902
                    byte[] left = IOUtils.readFully(in, seqlen, false);
                    inbuf = Arrays.copyOf(inbuf, offset + left.length);
                    System.arraycopy(left, 0, inbuf, offset, left.length);
                    offset += left.length;
D
duke 已提交
903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
/*
if (dump > 0) {
System.err.println("seqlen: " + seqlen);
System.err.println("bufsize: " + offset);
System.err.println("bytesleft: " + bytesleft);
System.err.println("bytesread: " + bytesread);
}
*/


                    try {
                        retBer = new BerDecoder(inbuf, 0, offset);

                        if (traceFile != null) {
                            Ber.dumpBER(traceFile, traceTagIn, inbuf, 0, offset);
                        }

                        retBer.parseSeq(null);
                        inMsgId = retBer.parseInt();
                        retBer.reset(); // reset offset

                        boolean needPause = false;

                        if (inMsgId == 0) {
                            // Unsolicited Notification
                            parent.processUnsolicited(retBer);
                        } else {
                            LdapRequest ldr = findRequest(inMsgId);

                            if (ldr != null) {

                                /**
                                 * Grab pauseLock before making reply available
                                 * to ensure that reader goes into paused state
                                 * before writer can attempt to unpause reader
                                 */
                                synchronized (pauseLock) {
                                    needPause = ldr.addReplyBer(retBer);
                                    if (needPause) {
                                        /*
                                         * Go into paused state; release
                                         * pauseLock
                                         */
                                        pauseReader();
                                    }

                                    // else release pauseLock
                                }
                            } else {
                                // System.err.println("Cannot find" +
                                //              "LdapRequest for " + inMsgId);
                            }
                        }
                    } catch (Ber.DecodeException e) {
                        //System.err.println("Cannot parse Ber");
                    }
                } catch (IOException ie) {
                    if (debug) {
                        System.err.println("Connection: Inside Caught " + ie);
                        ie.printStackTrace();
                    }

                    if (in != getInputStream()) {
                        // A new stream to try
                        // Go to top of loop and continue
                    } else {
                        if (debug) {
                            System.err.println("Connection: rethrowing " + ie);
                        }
                        throw ie;  // rethrow exception
                    }
                }
            }

            if (debug) {
                System.err.println("Connection: end-of-stream detected: "
                    + in);
            }
        } catch (IOException ex) {
            if (debug) {
                System.err.println("Connection: Caught " + ex);
            }
            closureReason = ex;
        } finally {
            cleanup(null, true); // cleanup
        }
        if (debug) {
            System.err.println("Connection: Thread Exiting");
        }
    }


    // This code must be uncommented to run the LdapAbandonTest.
    /*public void sendSearchReqs(String dn, int numReqs) {
        int i;
        String attrs[] = null;
        for(i = 1; i <= numReqs; i++) {
            BerEncoder ber = new BerEncoder(2048);

            try {
            ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
                ber.encodeInt(i);
                ber.beginSeq(LdapClient.LDAP_REQ_SEARCH);
                    ber.encodeString(dn == null ? "" : dn);
                    ber.encodeInt(0, LdapClient.LBER_ENUMERATED);
                    ber.encodeInt(3, LdapClient.LBER_ENUMERATED);
                    ber.encodeInt(0);
                    ber.encodeInt(0);
                    ber.encodeBoolean(true);
                    LdapClient.encodeFilter(ber, "");
                    ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
                        ber.encodeStringArray(attrs);
                    ber.endSeq();
                ber.endSeq();
            ber.endSeq();
            writeRequest(ber, i);
            //System.err.println("wrote request " + i);
            } catch (Exception ex) {
            //System.err.println("ldap.search: Caught " + ex + " building req");
            }

        }
    } */
}