提交 349cbe55 编写于 作者: C chegar

6720866: Slow performance using HttpURLConnection for upload

Reviewed-by: michaelm
上级 37f4867b
...@@ -29,32 +29,58 @@ import java.io.*; ...@@ -29,32 +29,58 @@ import java.io.*;
/** /**
* OutputStream that sends the output to the underlying stream using chunked * OutputStream that sends the output to the underlying stream using chunked
* encoding as specified in RFC 2068. * encoding as specified in RFC 2068.
*
* @author Alan Bateman
*/ */
public class ChunkedOutputStream extends PrintStream { public class ChunkedOutputStream extends PrintStream {
/* Default chunk size (including chunk header) if not specified */ /* Default chunk size (including chunk header) if not specified */
static final int DEFAULT_CHUNK_SIZE = 4096; static final int DEFAULT_CHUNK_SIZE = 4096;
private static final byte[] CRLF = {'\r', '\n'};
private static final int CRLF_SIZE = CRLF.length;
private static final byte[] FOOTER = CRLF;
private static final int FOOTER_SIZE = CRLF_SIZE;
private static final byte[] EMPTY_CHUNK_HEADER = getHeader(0);
private static final int EMPTY_CHUNK_HEADER_SIZE = getHeaderSize(0);
/* internal buffer */ /* internal buffer */
private byte buf[]; private byte buf[];
/* size of data (excluding footers and headers) already stored in buf */
private int size;
/* current index in buf (i.e. buf[count] */
private int count; private int count;
/* number of bytes to be filled up to complete a data chunk
* currently being built */
private int spaceInCurrentChunk;
/* underlying stream */ /* underlying stream */
private PrintStream out; private PrintStream out;
/* the chunk size we use */ /* the chunk size we use */
private int preferredChunkSize; private int preferredChunkDataSize;
private int preferedHeaderSize;
/* if the users write buffer is bigger than this size, we private int preferredChunkGrossSize;
* write direct from the users buffer instead of copying /* header for a complete Chunk */
*/ private byte[] completeHeader;
static final int MAX_BUF_SIZE = 10 * 1024;
/* return the size of the header for a particular chunk size */ /* return the size of the header for a particular chunk size */
private int headerSize(int size) { private static int getHeaderSize(int size) {
return 2 + (Integer.toHexString(size)).length(); return (Integer.toHexString(size)).length() + CRLF_SIZE;
}
/* return a header for a particular chunk size */
private static byte[] getHeader(int size){
try {
String hexStr = Integer.toHexString(size);
byte[] hexBytes = hexStr.getBytes("US-ASCII");
byte[] header = new byte[getHeaderSize(size)];
for (int i=0; i<hexBytes.length; i++)
header[i] = hexBytes[i];
header[hexBytes.length] = CRLF[0];
header[hexBytes.length+1] = CRLF[1];
return header;
} catch (java.io.UnsupportedEncodingException e) {
/* This should never happen */
throw new InternalError(e.getMessage());
}
} }
public ChunkedOutputStream(PrintStream o) { public ChunkedOutputStream(PrintStream o) {
...@@ -63,111 +89,114 @@ public class ChunkedOutputStream extends PrintStream { ...@@ -63,111 +89,114 @@ public class ChunkedOutputStream extends PrintStream {
public ChunkedOutputStream(PrintStream o, int size) { public ChunkedOutputStream(PrintStream o, int size) {
super(o); super(o);
out = o; out = o;
if (size <= 0) { if (size <= 0) {
size = DEFAULT_CHUNK_SIZE; size = DEFAULT_CHUNK_SIZE;
} }
/* Adjust the size to cater for the chunk header - eg: if the /* Adjust the size to cater for the chunk header - eg: if the
* preferred chunk size is 1k this means the chunk size should * preferred chunk size is 1k this means the chunk size should
* be 1019 bytes (differs by 5 from preferred size because of * be 1017 bytes (differs by 7 from preferred size because of
* 3 bytes for chunk size in hex and CRLF). * 3 bytes for chunk size in hex and CRLF (header) and CRLF (footer)).
*
* If headerSize(adjusted_size) is shorter then headerSize(size)
* then try to use the extra byte unless headerSize(adjusted_size+1)
* increases back to headerSize(size)
*/ */
if (size > 0) { if (size > 0) {
int adjusted_size = size - headerSize(size); int adjusted_size = size - getHeaderSize(size) - FOOTER_SIZE;
if (adjusted_size + headerSize(adjusted_size) < size) { if (getHeaderSize(adjusted_size+1) < getHeaderSize(size)){
adjusted_size++; adjusted_size++;
} }
size = adjusted_size; size = adjusted_size;
} }
if (size > 0) { if (size > 0) {
preferredChunkSize = size; preferredChunkDataSize = size;
} else { } else {
preferredChunkSize = DEFAULT_CHUNK_SIZE - headerSize(DEFAULT_CHUNK_SIZE); preferredChunkDataSize = DEFAULT_CHUNK_SIZE -
getHeaderSize(DEFAULT_CHUNK_SIZE) - FOOTER_SIZE;
} }
preferedHeaderSize = getHeaderSize(preferredChunkDataSize);
preferredChunkGrossSize = preferedHeaderSize + preferredChunkDataSize
+ FOOTER_SIZE;
completeHeader = getHeader(preferredChunkDataSize);
/* start with an initial buffer */ /* start with an initial buffer */
buf = new byte[preferredChunkSize + 32]; buf = new byte[preferredChunkDataSize + 32];
reset();
} }
/* /*
* If flushAll is true, then all data is flushed in one chunk. * Flush a buffered, completed chunk to an underlying stream. If the data in
* * the buffer is insufficient to build up a chunk of "preferredChunkSize"
* If false and the size of the buffer data exceeds the preferred * then the data do not get flushed unless flushAll is true. If flushAll is
* chunk size then chunks are flushed to the output stream. * true then the remaining data builds up a last chunk which size is smaller
* If there isn't enough data to make up a complete chunk, * than preferredChunkSize, and then the last chunk gets flushed to
* then the method returns. * underlying stream. If flushAll is true and there is no data in a buffer
* at all then an empty chunk (containing a header only) gets flushed to
* underlying stream.
*/ */
private void flush(byte[] buf, boolean flushAll) { private void flush(boolean flushAll) {
flush (buf, flushAll, 0); if (spaceInCurrentChunk == 0) {
} /* flush a completed chunk to underlying stream */
out.write(buf, 0, preferredChunkGrossSize);
private void flush(byte[] buf, boolean flushAll, int offset) { out.flush();
int chunkSize; reset();
} else if (flushAll){
do { /* complete the last chunk and flush it to underlying stream */
if (count < preferredChunkSize) { if (size > 0){
if (!flushAll) { /* adjust a header start index in case the header of the last
break; * chunk is shorter then preferedHeaderSize */
}
chunkSize = count; int adjustedHeaderStartIndex = preferedHeaderSize -
getHeaderSize(size);
/* write header */
System.arraycopy(getHeader(size), 0, buf,
adjustedHeaderStartIndex, getHeaderSize(size));
/* write footer */
buf[count++] = FOOTER[0];
buf[count++] = FOOTER[1];
//send the last chunk to underlying stream
out.write(buf, adjustedHeaderStartIndex, count - adjustedHeaderStartIndex);
} else { } else {
chunkSize = preferredChunkSize; //send an empty chunk (containing just a header) to underlying stream
} out.write(EMPTY_CHUNK_HEADER, 0, EMPTY_CHUNK_HEADER_SIZE);
byte[] bytes = null;
try {
bytes = (Integer.toHexString(chunkSize)).getBytes("US-ASCII");
} catch (java.io.UnsupportedEncodingException e) {
//This should never happen.
throw new InternalError(e.getMessage());
} }
out.write(bytes, 0, bytes.length);
out.write((byte)'\r');
out.write((byte)'\n');
if (chunkSize > 0) {
out.write(buf, offset, chunkSize);
out.write((byte)'\r');
out.write((byte)'\n');
}
out.flush(); out.flush();
if (checkError()) { reset();
break; }
}
if (chunkSize > 0) {
count -= chunkSize;
offset += chunkSize;
}
} while (count > 0);
if (!checkError() && count > 0) {
System.arraycopy(buf, offset, this.buf, 0, count);
}
} }
@Override
public boolean checkError() { public boolean checkError() {
return out.checkError(); return out.checkError();
} }
/*
* Check if we have enough data for a chunk and if so flush to the
* underlying output stream.
*/
private void checkFlush() {
if (count >= preferredChunkSize) {
flush(buf, false);
}
}
/* Check that the output stream is still open */ /* Check that the output stream is still open */
private void ensureOpen() { private void ensureOpen() {
if (out == null) if (out == null)
setError(); setError();
} }
/*
* Writes data from b[] to an internal buffer and stores the data as data
* chunks of a following format: {Data length in Hex}{CRLF}{data}{CRLF}
* The size of the data is preferredChunkSize. As soon as a completed chunk
* is read from b[] a process of reading from b[] suspends, the chunk gets
* flushed to the underlying stream and then the reading process from b[]
* continues. When there is no more sufficient data in b[] to build up a
* chunk of preferredChunkSize size the data get stored as an incomplete
* chunk of a following format: {space for data length}{CRLF}{data}
* The size of the data is of course smaller than preferredChunkSize.
*/
@Override
public synchronized void write(byte b[], int off, int len) { public synchronized void write(byte b[], int off, int len) {
ensureOpen(); ensureOpen();
if ((off < 0) || (off > b.length) || (len < 0) || if ((off < 0) || (off > b.length) || (len < 0) ||
...@@ -177,81 +206,95 @@ public class ChunkedOutputStream extends PrintStream { ...@@ -177,81 +206,95 @@ public class ChunkedOutputStream extends PrintStream {
return; return;
} }
int l = preferredChunkSize - count; /* if b[] contains enough data then one loop cycle creates one complete
* data chunk with a header, body and a footer, and then flushes the
* chunk to the underlying stream. Otherwise, the last loop cycle
* creates incomplete data chunk with empty header and with no footer
* and stores this incomplete chunk in an internal buffer buf[]
*/
int bytesToWrite = len;
int inputIndex = off; /* the index of the byte[] currently being written */
if ((len > MAX_BUF_SIZE) && (len > l)) { do {
/* current chunk is empty just write the data */ /* enough data to complete a chunk */
if (count == 0) { if (bytesToWrite >= spaceInCurrentChunk) {
count = len;
flush (b, false, off); /* header */
return; for (int i=0; i<completeHeader.length; i++)
buf[i] = completeHeader[i];
/* data */
System.arraycopy(b, inputIndex, buf, count, spaceInCurrentChunk);
inputIndex += spaceInCurrentChunk;
bytesToWrite -= spaceInCurrentChunk;
count += spaceInCurrentChunk;
/* footer */
buf[count++] = FOOTER[0];
buf[count++] = FOOTER[1];
spaceInCurrentChunk = 0; //chunk is complete
flush(false);
if (checkError()){
break;
}
} }
/* first finish the current chunk */ /* not enough data to build a chunk */
if (l > 0) { else {
System.arraycopy(b, off, buf, count, l); /* header */
count = preferredChunkSize; /* do not write header if not enough bytes to build a chunk yet */
flush(buf, false);
}
count = len - l; /* data */
/* Now write the rest of the data */ System.arraycopy(b, inputIndex, buf, count, bytesToWrite);
flush (b, false, l+off); count += bytesToWrite;
} else { size += bytesToWrite;
int newcount = count + len; spaceInCurrentChunk -= bytesToWrite;
bytesToWrite = 0;
if (newcount > buf.length) { /* footer */
byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; /* do not write header if not enough bytes to build a chunk yet */
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
} }
System.arraycopy(b, off, buf, count, len); } while (bytesToWrite > 0);
count = newcount;
checkFlush();
}
} }
public synchronized void write(int b) { @Override
ensureOpen(); public synchronized void write(int _b) {
int newcount = count + 1; byte b[] = {(byte)_b};
if (newcount > buf.length) { write(b, 0, 1);
byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
}
buf[count] = (byte)b;
count = newcount;
checkFlush();
} }
public synchronized void reset() { public synchronized void reset() {
count = 0; count = preferedHeaderSize;
size = 0;
spaceInCurrentChunk = preferredChunkDataSize;
} }
public int size() { public int size() {
return count; return size;
} }
@Override
public synchronized void close() { public synchronized void close() {
ensureOpen(); ensureOpen();
/* if we have buffer a chunked send it */ /* if we have buffer a chunked send it */
if (count > 0) { if (size > 0) {
flush(buf, true); flush(true);
} }
/* send a zero length chunk */ /* send a zero length chunk */
flush(buf, true); flush(true);
/* don't close the underlying stream */ /* don't close the underlying stream */
out = null; out = null;
} }
@Override
public synchronized void flush() { public synchronized void flush() {
ensureOpen(); ensureOpen();
if (count > 0) { if (size > 0) {
flush(buf, true); flush(true);
} }
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册