From e65635d8f46f2d8ff161c515b29d14ce52c0ad40 Mon Sep 17 00:00:00 2001 From: "lei.yul" Date: Wed, 16 Sep 2020 10:22:59 +0800 Subject: [PATCH] [Wisp] Refactoring WispSocket iostream creation Summary: Refactoring WispSocket's InputStream and OutputStream creating code for readability. Test Plan: all wisp tests Reviewed-by: joeyleeeeeee97 shiyuexw Issue: alibaba/dragonwell8#124 --- .../classes/sun/nio/ch/WispSocketImpl.java | 400 +++++++++--------- 1 file changed, 207 insertions(+), 193 deletions(-) diff --git a/src/linux/classes/sun/nio/ch/WispSocketImpl.java b/src/linux/classes/sun/nio/ch/WispSocketImpl.java index 1a45331e6..c987080f5 100644 --- a/src/linux/classes/sun/nio/ch/WispSocketImpl.java +++ b/src/linux/classes/sun/nio/ch/WispSocketImpl.java @@ -171,6 +171,124 @@ public class WispSocketImpl } } + private class WispSocketInputStream extends InputStream { + WispSocketInputStream(SocketChannel ch) { + this.ch = ch; + } + + protected final SocketChannel ch; + private ByteBuffer bb = null; + // Invoker's previous array + private byte[] bs = null; + private byte[] b1 = null; + + private ByteBuffer readAhead = null; + + @Override + public int read() throws IOException { + if (b1 == null) { + b1 = new byte[1]; + } + int n = this.read(b1); + if (n == 1) + return b1[0] & 0xff; + return -1; + } + + @Override + public int read(byte[] bs, int off, int len) + throws IOException { + if (len <= 0 || off < 0 || off + len > bs.length) { + if (len == 0) { + return 0; + } + throw new ArrayIndexOutOfBoundsException(); + } + + ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs)); + + bb.limit(Math.min(off + len, bb.capacity())); + bb.position(off); + this.bb = bb; + this.bs = bs; + return read(bb); + } + + + private int read(ByteBuffer bb) throws IOException { + try { + wispSocketLockSupport.beginRead(); + return read0(bb); + } finally { + wispSocketLockSupport.endRead(); + } + } + + private int read0(ByteBuffer bb) + throws IOException { + int n; + try { + if (readAhead != null && readAhead.hasRemaining()) { + if (bb.remaining() >= readAhead.remaining()) { + n = readAhead.remaining(); + bb.put(readAhead); + } else { + n = bb.remaining(); + for (int i = 0; i < n; i++) { + bb.put(readAhead.get()); + } + } + return n; + } + + if ((n = ch.read(bb)) != 0) { + return n; + } + + if (so.getSoTimeout() > 0) { + WEA.addTimer(System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(so.getSoTimeout())); + } + + do { + WEA.registerEvent(ch, SelectionKey.OP_READ); + WEA.park(-1); + + if (so.getSoTimeout() > 0 && WEA.isTimeout()) { + throw new SocketTimeoutException("time out"); + } + } while ((n = ch.read(bb)) == 0); + } finally { + if (so.getSoTimeout() > 0) { + WEA.cancelTimer(); + } + WEA.unregisterEvent(); + } + + return n; + } + + @Override + public int available() throws IOException { + if (readAhead == null) { + readAhead = ByteBuffer.allocate(4096); + } else if (readAhead.hasRemaining()) { + return readAhead.remaining(); + } + + readAhead.clear(); + ch.read(readAhead); + readAhead.flip(); + + return readAhead.remaining(); + } + + @Override + public void close() throws IOException { + WispSocketImpl.this.close(); + } + } + public InputStream getInputStream() throws IOException { if (isClosed()) throw new SocketException("Socket is closed"); @@ -183,119 +301,7 @@ public class WispSocketImpl socketInputStream = AccessController.doPrivileged( new PrivilegedExceptionAction() { public InputStream run() throws IOException { - return new InputStream() { - protected final SocketChannel ch = getChannelImpl(); - private ByteBuffer bb = null; - // Invoker's previous array - private byte[] bs = null; - private byte[] b1 = null; - - private ByteBuffer readAhead = null; - - @Override - public int read() throws IOException { - if (b1 == null) { - b1 = new byte[1]; - } - int n = this.read(b1); - if (n == 1) - return b1[0] & 0xff; - return -1; - } - - @Override - public int read(byte[] bs, int off, int len) - throws IOException { - if (len <= 0 || off < 0 || off + len > bs.length) { - if (len == 0) { - return 0; - } - throw new ArrayIndexOutOfBoundsException(); - } - - ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs)); - - bb.limit(Math.min(off + len, bb.capacity())); - bb.position(off); - this.bb = bb; - this.bs = bs; - return read(bb); - } - - - private int read(ByteBuffer bb) throws IOException { - try { - wispSocketLockSupport.beginRead(); - return read0(bb); - } finally { - wispSocketLockSupport.endRead(); - } - } - - private int read0(ByteBuffer bb) - throws IOException { - int n; - try { - if (readAhead != null && readAhead.hasRemaining()) { - if (bb.remaining() >= readAhead.remaining()) { - n = readAhead.remaining(); - bb.put(readAhead); - } else { - n = bb.remaining(); - for (int i = 0; i < n; i++) { - bb.put(readAhead.get()); - } - } - return n; - } - - if ((n = ch.read(bb)) != 0) { - return n; - } - - if (so.getSoTimeout() > 0) { - WEA.addTimer(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(so.getSoTimeout())); - } - - do { - WEA.registerEvent(ch, SelectionKey.OP_READ); - WEA.park(-1); - - if (so.getSoTimeout() > 0 && WEA.isTimeout()) { - throw new SocketTimeoutException("time out"); - - } - } while ((n = ch.read(bb)) == 0); - } finally { - if (so.getSoTimeout() > 0) { - WEA.cancelTimer(); - } - WEA.unregisterEvent(); - } - - return n; - } - - @Override - public int available() throws IOException { - if (readAhead == null) { - readAhead = ByteBuffer.allocate(4096); - } else if (readAhead.hasRemaining()) { - return readAhead.remaining(); - } - - readAhead.clear(); - ch.read(readAhead); - readAhead.flip(); - - return readAhead.remaining(); - } - - @Override - public void close() throws IOException { - WispSocketImpl.this.close(); - } - }; + return new WispSocketInputStream(getChannelImpl()); } }); } catch (java.security.PrivilegedActionException e) { @@ -305,6 +311,93 @@ public class WispSocketImpl return socketInputStream; } + private class WispSocketOutputStream extends OutputStream { + + WispSocketOutputStream(SocketChannel ch) { + this.ch = ch; + } + + protected final SocketChannel ch; + private ByteBuffer bb = null; + // Invoker's previous array + private byte[] bs = null; + private byte[] b1 = null; + + + @Override + public void write(int b) throws IOException { + if (b1 == null) { + b1 = new byte[1]; + } + b1[0] = (byte) b; + this.write(b1); + } + + @Override + public void write(byte[] bs, int off, int len) + throws IOException + { + if (len <= 0 || off < 0 || off + len > bs.length) { + if (len == 0) { + return; + } + throw new ArrayIndexOutOfBoundsException(); + } + ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs)); + bb.limit(Math.min(off + len, bb.capacity())); + bb.position(off); + this.bb = bb; + this.bs = bs; + + write(bb); + } + + private void write(ByteBuffer bb) throws IOException { + try { + wispSocketLockSupport.beginWrite(); + write0(bb); + } finally { + wispSocketLockSupport.endWrite(); + } + } + + private void write0(ByteBuffer bb) + throws IOException { + + try { + int writeLen = bb.remaining(); + if (ch.write(bb) == writeLen) { + return; + } + + if (so.getSoTimeout() > 0) { + WEA.addTimer(System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(so.getSoTimeout())); + } + + do { + WEA.registerEvent(ch, SelectionKey.OP_WRITE); + WEA.park(-1); + + if (so.getSoTimeout() > 0 && WEA.isTimeout()) { + throw new SocketTimeoutException("time out"); + } + ch.write(bb); + } while (bb.remaining() > 0); + } finally { + if (so.getSoTimeout() > 0) { + WEA.cancelTimer(); + } + WEA.unregisterEvent(); + } + } + + @Override + public void close() throws IOException { + WispSocketImpl.this.close(); + } + } + public OutputStream getOutputStream() throws IOException { if (isClosed()) throw new SocketException("Socket is closed"); @@ -316,86 +409,7 @@ public class WispSocketImpl return AccessController.doPrivileged( new PrivilegedExceptionAction() { public OutputStream run() throws IOException { - return new OutputStream() { - protected final SocketChannel ch = getChannelImpl(); - private ByteBuffer bb = null; - // Invoker's previous array - private byte[] bs = null; - private byte[] b1 = null; - - - @Override - public void write(int b) throws IOException { - if (b1 == null) { - b1 = new byte[1]; - } - b1[0] = (byte) b; - this.write(b1); - } - - @Override - public void write(byte[] bs, int off, int len) - throws IOException - { - if (len <= 0 || off < 0 || off + len > bs.length) { - if (len == 0) { - return; - } - throw new ArrayIndexOutOfBoundsException(); - } - ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs)); - bb.limit(Math.min(off + len, bb.capacity())); - bb.position(off); - this.bb = bb; - this.bs = bs; - - write(bb); - } - - private void write(ByteBuffer bb) throws IOException { - try { - wispSocketLockSupport.beginWrite(); - write0(bb); - } finally { - wispSocketLockSupport.endWrite(); - } - } - - private void write0(ByteBuffer bb) - throws IOException { - - try { - int writeLen = bb.remaining(); - if (ch.write(bb) == writeLen) { - return; - } - - if (so.getSoTimeout() > 0) { - WEA.addTimer(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(so.getSoTimeout())); - } - - do { - WEA.registerEvent(ch, SelectionKey.OP_WRITE); - WEA.park(-1); - - if (so.getSoTimeout() > 0 && WEA.isTimeout()) { - throw new SocketTimeoutException("time out"); - } - ch.write(bb); - } while (bb.remaining() > 0); - } finally { - if (so.getSoTimeout() > 0) { - WEA.cancelTimer(); - } - WEA.unregisterEvent(); - } - } - - @Override - public void close() throws IOException { - WispSocketImpl.this.close(); - } - }; + return new WispSocketOutputStream(getChannelImpl()); } }); } catch (java.security.PrivilegedActionException e) { -- GitLab