提交 e6cc4a55 编写于 作者: V Vlad Ilyushchenko

PG: send flow control is good, we need to test send flow control on every new...

PG: send flow control is good, we need to test send flow control on every new feature we add though. Next is to send query result using undersized buffer.
上级 703c8557
......@@ -77,9 +77,11 @@ public class WireParser implements Closeable {
private final DirectByteCharSequence dbcs = new DirectByteCharSequence();
private final BindVariableService bindVariableService = new BindVariableService();
private RecordCursor currentCursor = null;
private boolean sendCurrentCursorTail = false;
private long sendBufferPtr;
private boolean loggedIn = false;
private long recvBufferOffset = 0;
private boolean loginRequestProcessed = false;
private long recvBufferWriteOffset = 0;
private long recvBufferReadOffset = 0;
private int bufferRemainingOffset = 0;
private int bufferRemainingSize = 0;
private RecordCursorFactory currentFactory = null;
......@@ -107,68 +109,76 @@ public class WireParser implements Closeable {
}
public void process(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException, PeerIsSlowToWriteException {
final int remaining = (int) (recvBufferSize - recvBufferOffset);
if (remaining < 1) {
throw new RuntimeException("buffer overflow");
if (bufferRemainingSize > 0) {
doSend(fd, bufferRemainingOffset, bufferRemainingSize);
}
int n = doReceive(fd, remaining);
if (n < 0) {
throw PeerDisconnectedException.INSTANCE;
if (sendCurrentCursorTail) {
sendExecuteTail(fd);
}
if (n == 0) {
int retriesRemaining = idleRecvCountBeforeGivingUp;
while (retriesRemaining > 0) {
n = doReceive(fd, remaining);
if (n == 0) {
retriesRemaining--;
continue;
}
// If we have empty buffer we need to try to read something from socket
// however the opposite is a little tricky. If buffer is non-empty
// we still may need to read from socket if contents of this buffer
// is incomplete and cannot be parsed
if (recvBufferReadOffset == recvBufferWriteOffset) {
recv(fd);
}
if (n < 0) {
throw PeerDisconnectedException.INSTANCE;
}
long readOffsetBeforeParse = recvBufferReadOffset;
break;
}
// Parse will update the value of recvBufferOffset upon completion of
// logical block. We cannot count on return value because 'parse' may try to
// respond to client and fail with exception. When it does fail we would have
// to retry 'send' but not parse the same input again
parse(fd, recvBuffer + recvBufferReadOffset, (int) (recvBufferWriteOffset - recvBufferReadOffset));
if (retriesRemaining == 0) {
throw PeerIsSlowToWriteException.INSTANCE;
// nothing changed?
if (readOffsetBeforeParse == recvBufferReadOffset) {
// how come we have something in buffer and parse didn't do anything?
if (readOffsetBeforeParse < recvBufferWriteOffset) {
// may be content was incomplete?
recv(fd);
// still nothing? oh well
if (readOffsetBeforeParse == recvBufferReadOffset) {
return;
}
// at this point we have some contact and parse did do something
} else {
return;
}
}
int totalLen = (int) (n + recvBufferOffset);
int parsed = parse(fd, recvBuffer, totalLen);
if (parsed == 0) {
recvBufferOffset += n;
} else if (parsed < totalLen) {
int offset = parsed;
while (true) {
int len = totalLen - offset;
parsed = parse(fd, recvBuffer + offset, len);
if (parsed == 0) {
// we do not pre-compute length because 'parse' will mutate 'recvBufferReadOffset'
if (recvBufferWriteOffset - recvBufferReadOffset > 0) {
// did we not parse input fully?
do {
readOffsetBeforeParse = recvBufferReadOffset;
parse(fd, recvBuffer + recvBufferReadOffset, (int) (recvBufferWriteOffset - recvBufferReadOffset));
// nothing changed?
if (readOffsetBeforeParse == recvBufferReadOffset) {
// shift to start
Unsafe.getUnsafe().copyMemory(recvBuffer + offset, recvBuffer, len);
recvBufferOffset = len;
Unsafe.getUnsafe().copyMemory(recvBuffer + readOffsetBeforeParse, recvBuffer, recvBufferWriteOffset - readOffsetBeforeParse);
recvBufferWriteOffset = recvBufferWriteOffset - readOffsetBeforeParse;
recvBufferReadOffset = 0;
// read more
break;
} else if (parsed < (totalLen - offset)) {
offset += parsed;
} else {
recvBufferOffset = 0;
break;
return;
}
}
} else {
recvBufferOffset = 0;
} while (recvBufferReadOffset < recvBufferWriteOffset);
}
recvBufferWriteOffset = 0;
recvBufferReadOffset = 0;
}
private void disconnectClient(long fd) {
nf.close(fd);
loginRequestProcessed = false;
}
private int doReceive(long fd, int remaining) {
int n = nf.recv(fd, recvBuffer + recvBufferOffset, remaining);
dumpBuffer('>', recvBuffer + recvBufferOffset, n);
int n = nf.recv(fd, recvBuffer + recvBufferWriteOffset, remaining);
dumpBuffer('>', recvBuffer + recvBufferWriteOffset, n);
return n;
}
......@@ -178,9 +188,19 @@ public class WireParser implements Closeable {
}
}
private void disconnectClient(long fd) {
nf.close(fd);
loggedIn = false;
private void doSend(long fd, int offset, int size) throws PeerDisconnectedException, PeerIsSlowToReadException {
final int n = nf.send(fd, sendBuffer + offset, size);
dumpBuffer('<', sendBuffer, n);
if (n < 0) {
throw PeerDisconnectedException.INSTANCE;
}
if (n < size) {
doSendWithRetries(fd, n, size - n);
}
sendBufferPtr = sendBuffer;
bufferRemainingSize = 0;
bufferRemainingOffset = 0;
}
private void doSendWithRetries(long fd, int bufferOffset, int bufferSize) throws PeerDisconnectedException, PeerIsSlowToReadException {
......@@ -194,6 +214,8 @@ public class WireParser implements Closeable {
throw PeerDisconnectedException.INSTANCE;
}
dumpBuffer('<', sendBuffer, m);
if (m > 0) {
remaining -= m;
offset += m;
......@@ -221,12 +243,13 @@ public class WireParser implements Closeable {
* int the buffer they need to be passed again in parse function along with
* any additional bytes received
*/
private int parse(long fd, long address, int len) throws PeerDisconnectedException, PeerIsSlowToReadException {
private void parse(long fd, long address, int len) throws PeerDisconnectedException, PeerIsSlowToReadException {
long limit = address + len;
final int remaining = (int) (limit - address);
if (!loggedIn) {
return processLogin(fd, address, limit, remaining);
if (!loginRequestProcessed) {
processLogin(fd, address, limit, remaining);
return;
}
// this is a type-prefixed message
......@@ -234,7 +257,7 @@ public class WireParser implements Closeable {
if (remaining < AbstractTypePrefixedHeader.LEN) {
// we need to be able to read header and length
return 0;
return;
}
final int msgLen = AbstractTypePrefixedHeader.getLen(address);
......@@ -242,9 +265,12 @@ public class WireParser implements Closeable {
// msgLen does not take into account type byte
if (msgLen > remaining - 1) {
return 0;
return;
}
// we have enough to read entire message
recvBufferReadOffset += msgLen + 1;
final byte type = AbstractTypePrefixedHeader.getType(address);
LOG.info().$("got msg '").$((char) type).$('\'').$();
......@@ -293,8 +319,8 @@ public class WireParser implements Closeable {
parseQuery(fd, dbcs);
if (currentFactory == null) {
prepareReadyForQuery();
send(fd);
LOG.info().$("executed DDL").$();
send(fd);
}
break;
case 'X':
......@@ -304,6 +330,7 @@ public class WireParser implements Closeable {
case 'C':
// close
// todo: read what we are closing
currentFactory = null;
responseAsciiSink.put('3'); // close complete
responseAsciiSink.putNetworkInt(Integer.BYTES);
send(fd);
......@@ -319,11 +346,7 @@ public class WireParser implements Closeable {
LOG.info().$("executing query").$();
currentCursor = currentFactory.getCursor();
sendCursor(fd, currentCursor, currentFactory.getMetadata());
currentCursor.close();
prepareCommandComplete();
prepareReadyForQuery();
send(fd);
LOG.info().$("executed query").$();
sendExecuteTail(fd);
}
break;
case 'S': // sync?
......@@ -340,7 +363,70 @@ public class WireParser implements Closeable {
LOG.error().$("unknown message").$();
assert false;
}
return msgLen + 1;
}
private void processLogin(long fd, long address, long limit, int remaining) throws PeerDisconnectedException, PeerIsSlowToReadException {
int msgLen;
long msgLimit;// expect startup request
if (remaining < 4) {
return;
}
// there is data for length
// this is quite specific to message type :(
msgLen = StartupMessage.getLen(address); // postgesql includes length bytes in length of message
// do we have the rest of the message?
if (msgLen > remaining) {
// we have length - get the rest when ready
return;
}
// enough to read login request
recvBufferReadOffset += msgLen;
loginRequestProcessed = true;
// consume message
// process protocol
int protocol = StartupMessage.getProtocol(address);
// todo: validate protocol, see 'NegotiateProtocolVersion'
// extract properties
msgLimit = address + msgLen;
long lo = address + 8; // 8 is offset where name value pairs begin
// there is an extra byte at the end and it has to be 0
while (lo < msgLimit - 1) {
long hi = getStringLength(lo, msgLimit);
// todo: close connection when protocol is broken
assert hi > -1;
CharSequence name = new DirectByteCharSequence().of(lo, hi);
// name is ready
lo = hi + 1;
hi = getStringLength(lo, msgLimit);
assert hi > -1;
CharSequence value = new DirectByteCharSequence().of(lo, hi);
lo = hi + 1;
LOG.info()
.$("protocol [major=").$(protocol >> 16)
.$(", minor=").$((short) protocol)
.$(", name=").$(name)
.$(", value=").$(value)
.$(']').$();
}
// todo: close connection if protocol is violated
assert Unsafe.getUnsafe().getByte(lo) == 0;
// todo: check that there is no more data sent
assert lo + 1 == limit;
sendClearTextPasswordChallenge(fd);
}
private long getStringLength(long x, long limit) {
......@@ -428,70 +514,39 @@ public class WireParser implements Closeable {
responseAsciiSink.put('I');
}
private int processLogin(long fd, long address, long limit, int remaining) throws PeerDisconnectedException, PeerIsSlowToReadException {
int msgLen;
long msgLimit;// expect startup request
if (remaining < 4) {
return 0;
}
// there is data for length
// this is quite specific to message type :(
msgLen = StartupMessage.getLen(address); // postgesql includes length bytes in length of message
private void recv(long fd) throws PeerDisconnectedException, PeerIsSlowToWriteException {
final int remaining = (int) (recvBufferSize - recvBufferWriteOffset);
// do we have the rest of the message?
if (msgLen > remaining) {
// we have length - get the rest when ready
return 0;
if (remaining < 1) {
throw new RuntimeException("buffer overflow");
}
// 'StartupMessage'
// consume message
// process protocol
int protocol = StartupMessage.getProtocol(address);
// todo: validate protocol, see 'NegotiateProtocolVersion'
// extract properties
msgLimit = address + msgLen;
long lo = address + 8; // 8 is offset where name value pairs begin
// there is an extra byte at the end and it has to be 0
while (lo < msgLimit - 1) {
long hi = getStringLength(lo, msgLimit);
// todo: close connection when protocol is broken
assert hi > -1;
CharSequence name = new DirectByteCharSequence().of(lo, hi);
// name is ready
int n = doReceive(fd, remaining);
if (n < 0) {
throw PeerDisconnectedException.INSTANCE;
}
lo = hi + 1;
if (n == 0) {
int retriesRemaining = idleRecvCountBeforeGivingUp;
while (retriesRemaining > 0) {
n = doReceive(fd, remaining);
if (n == 0) {
retriesRemaining--;
continue;
}
hi = getStringLength(lo, msgLimit);
assert hi > -1;
CharSequence value = new DirectByteCharSequence().of(lo, hi);
if (n < 0) {
throw PeerDisconnectedException.INSTANCE;
}
lo = hi + 1;
break;
}
LOG.info()
.$("protocol [major=").$(protocol >> 16)
.$(", minor=").$((short) protocol)
.$(", name=").$(name)
.$(", value=").$(value)
.$(']').$();
if (retriesRemaining == 0) {
throw PeerIsSlowToWriteException.INSTANCE;
}
}
// todo: close connection if protocol is violated
assert Unsafe.getUnsafe().getByte(lo) == 0;
// todo: check that there is no more data sent
assert lo + 1 == limit;
sendClearTextPasswordChallenge(fd);
loggedIn = true;
// return msgLen as is because on login message length is all inclusive
return msgLen;
recvBufferWriteOffset += n;
}
private void prepareRowDescription(RecordMetadata metadata) {
......@@ -515,17 +570,7 @@ public class WireParser implements Closeable {
}
private void send(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException {
final int len = (int) (sendBufferPtr - sendBuffer);
dumpBuffer('<', sendBuffer, len);
final int n = nf.send(fd, sendBuffer, len);
if (n < 0) {
throw PeerDisconnectedException.INSTANCE;
}
if (n < len) {
doSendWithRetries(fd, n, len - n);
}
sendBufferPtr = sendBuffer;
doSend(fd, 0, (int) (sendBufferPtr - sendBuffer));
}
private void sendClearTextPasswordChallenge(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException {
......@@ -654,6 +699,16 @@ public class WireParser implements Closeable {
}
responseAsciiSink.putLen(b);
}
currentCursor = Misc.free(currentCursor);
sendCurrentCursorTail = true;
send(fd);
}
private void sendExecuteTail(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException {
prepareCommandComplete();
prepareReadyForQuery();
LOG.info().$("executed query").$();
sendCurrentCursorTail = false;
send(fd);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册