提交 703c8557 编写于 作者: V Vlad Ilyushchenko

PG: non-blocking server, receive flow control, working tests. I still need to...

PG: non-blocking server, receive flow control, working tests. I still need to test send flow control.
上级 cccb0988
...@@ -97,7 +97,7 @@ public class HttpServer implements Closeable { ...@@ -97,7 +97,7 @@ public class HttpServer implements Closeable {
} }
} }
public SOCountDownLatch getStartedLatch() { SOCountDownLatch getStartedLatch() {
return started; return started;
} }
...@@ -206,7 +206,7 @@ public class HttpServer implements Closeable { ...@@ -206,7 +206,7 @@ public class HttpServer implements Closeable {
} }
} }
private class HttpContextFactory implements IOContextFactory<HttpConnectionContext> { private static class HttpContextFactory implements IOContextFactory<HttpConnectionContext> {
@Override @Override
public HttpConnectionContext newInstance(long fd) { public HttpConnectionContext newInstance(long fd) {
Thread thread = Thread.currentThread(); Thread thread = Thread.currentThread();
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire;
import com.questdb.network.NetworkFacade;
import com.questdb.network.NetworkFacadeImpl;
public class DefaultWireParserConfiguration implements WireParserConfiguration {
@Override
public int getIdleRecvCountBeforeGivingUp() {
return 10_000;
}
@Override
public NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
@Override
public int getRecvBufferSize() {
return 1024 * 1024;
}
@Override
public int getSendBufferSize() {
return 1024 * 1024;
}
@Override
public int getIdleSendCountBeforeGivingUp() {
return 10_000;
}
}
...@@ -37,10 +37,7 @@ import com.questdb.griffin.SqlException; ...@@ -37,10 +37,7 @@ import com.questdb.griffin.SqlException;
import com.questdb.griffin.engine.functions.bind.BindVariableService; import com.questdb.griffin.engine.functions.bind.BindVariableService;
import com.questdb.log.Log; import com.questdb.log.Log;
import com.questdb.log.LogFactory; import com.questdb.log.LogFactory;
import com.questdb.network.NetworkFacade; import com.questdb.network.*;
import com.questdb.network.NoSpaceLeftInResponseBufferException;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.*; import com.questdb.std.*;
import com.questdb.std.microtime.DateFormatUtils; import com.questdb.std.microtime.DateFormatUtils;
import com.questdb.std.str.AbstractCharSink; import com.questdb.std.str.AbstractCharSink;
...@@ -56,14 +53,14 @@ import static com.questdb.std.time.DateFormatUtils.defaultLocale; ...@@ -56,14 +53,14 @@ import static com.questdb.std.time.DateFormatUtils.defaultLocale;
public class WireParser implements Closeable { public class WireParser implements Closeable {
public static final byte MESSAGE_TYPE_LOGIN_RESPONSE = 'R'; private static final byte MESSAGE_TYPE_LOGIN_RESPONSE = 'R';
public static final byte MESSAGE_TYPE_READY_FOR_QUERY = 'Z'; private static final byte MESSAGE_TYPE_READY_FOR_QUERY = 'Z';
public static final byte MESSAGE_TYPE_PARAMETER_STATUS = 'S'; private static final byte MESSAGE_TYPE_PARAMETER_STATUS = 'S';
public static final byte MESSAGE_TYPE_COMMAND_COMPLETE = 'C'; private static final byte MESSAGE_TYPE_COMMAND_COMPLETE = 'C';
public static final byte MESSAGE_TYPE_DATA_ROW = 'D'; private static final byte MESSAGE_TYPE_DATA_ROW = 'D';
public static final byte MESSAGE_TYPE_ROW_DESCRIPTION = 'T'; private static final byte MESSAGE_TYPE_ROW_DESCRIPTION = 'T';
public static final byte MESSAGE_TYPE_ERROR_RESPONSE = 'E'; private static final byte MESSAGE_TYPE_ERROR_RESPONSE = 'E';
public static final byte MESSAGE_TYPE_PARSE_COMPLETE = '1'; private static final byte MESSAGE_TYPE_PARSE_COMPLETE = '1';
private final static Log LOG = LogFactory.getLog(WireParser.class); private final static Log LOG = LogFactory.getLog(WireParser.class);
private static final IntIntHashMap typeOidMap = new IntIntHashMap(); private static final IntIntHashMap typeOidMap = new IntIntHashMap();
private final NetworkFacade nf; private final NetworkFacade nf;
...@@ -75,6 +72,7 @@ public class WireParser implements Closeable { ...@@ -75,6 +72,7 @@ public class WireParser implements Closeable {
private final SqlCompiler compiler; private final SqlCompiler compiler;
private final ResponseAsciiSink responseAsciiSink = new ResponseAsciiSink(); private final ResponseAsciiSink responseAsciiSink = new ResponseAsciiSink();
private final int idleSendCountBeforeGivingUp; private final int idleSendCountBeforeGivingUp;
private final int idleRecvCountBeforeGivingUp;
private final AssociativeCache<RecordCursorFactory> factoryCache = new AssociativeCache<>(16, 16); private final AssociativeCache<RecordCursorFactory> factoryCache = new AssociativeCache<>(16, 16);
private final DirectByteCharSequence dbcs = new DirectByteCharSequence(); private final DirectByteCharSequence dbcs = new DirectByteCharSequence();
private final BindVariableService bindVariableService = new BindVariableService(); private final BindVariableService bindVariableService = new BindVariableService();
...@@ -97,6 +95,7 @@ public class WireParser implements Closeable { ...@@ -97,6 +95,7 @@ public class WireParser implements Closeable {
this.sendBufferPtr = sendBuffer; this.sendBufferPtr = sendBuffer;
this.sendBufferLimit = sendBuffer + sendBufferSize; this.sendBufferLimit = sendBuffer + sendBufferSize;
this.idleSendCountBeforeGivingUp = configuration.getIdleSendCountBeforeGivingUp(); this.idleSendCountBeforeGivingUp = configuration.getIdleSendCountBeforeGivingUp();
this.idleRecvCountBeforeGivingUp = configuration.getIdleRecvCountBeforeGivingUp();
this.dumpNetworkTraffic = configuration.getDumpNetworkTraffic(); this.dumpNetworkTraffic = configuration.getDumpNetworkTraffic();
} }
...@@ -107,32 +106,47 @@ public class WireParser implements Closeable { ...@@ -107,32 +106,47 @@ public class WireParser implements Closeable {
Misc.free(compiler); Misc.free(compiler);
} }
public void recv(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException { public void process(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException, PeerIsSlowToWriteException {
final int remaining = (int) (recvBufferSize - recvBufferOffset); final int remaining = (int) (recvBufferSize - recvBufferOffset);
if (remaining < 1) { if (remaining < 1) {
throw new RuntimeException("buffer overflow"); throw new RuntimeException("buffer overflow");
} }
final int n = nf.recv(fd, recvBuffer + recvBufferOffset, remaining); int n = doReceive(fd, remaining);
dumpBuffer('>', recvBuffer + recvBufferOffset, n);
if (n < 0) { if (n < 0) {
throw PeerDisconnectedException.INSTANCE; throw PeerDisconnectedException.INSTANCE;
} }
if (n == 0) { if (n == 0) {
// todo: stay in tight loop for a bit before giving up int retriesRemaining = idleRecvCountBeforeGivingUp;
// todo: this exception is misplaced - peer is writing here while (retriesRemaining > 0) {
throw PeerIsSlowToReadException.INSTANCE; n = doReceive(fd, remaining);
if (n == 0) {
retriesRemaining--;
continue;
}
if (n < 0) {
throw PeerDisconnectedException.INSTANCE;
}
break;
}
if (retriesRemaining == 0) {
throw PeerIsSlowToWriteException.INSTANCE;
}
} }
int parsed = parse(fd, recvBuffer, n); int totalLen = (int) (n + recvBufferOffset);
int parsed = parse(fd, recvBuffer, totalLen);
if (parsed == 0) { if (parsed == 0) {
recvBufferOffset += n; recvBufferOffset += n;
} else if (parsed < n) { } else if (parsed < totalLen) {
int offset = parsed; int offset = parsed;
while (true) { while (true) {
int len = n - offset; int len = totalLen - offset;
parsed = parse(fd, recvBuffer + offset, len); parsed = parse(fd, recvBuffer + offset, len);
if (parsed == 0) { if (parsed == 0) {
// shift to start // shift to start
...@@ -140,7 +154,7 @@ public class WireParser implements Closeable { ...@@ -140,7 +154,7 @@ public class WireParser implements Closeable {
recvBufferOffset = len; recvBufferOffset = len;
// read more // read more
break; break;
} else if (parsed < (n - offset)) { } else if (parsed < (totalLen - offset)) {
offset += parsed; offset += parsed;
} else { } else {
recvBufferOffset = 0; recvBufferOffset = 0;
...@@ -152,6 +166,12 @@ public class WireParser implements Closeable { ...@@ -152,6 +166,12 @@ public class WireParser implements Closeable {
} }
} }
private int doReceive(long fd, int remaining) {
int n = nf.recv(fd, recvBuffer + recvBufferOffset, remaining);
dumpBuffer('>', recvBuffer + recvBufferOffset, n);
return n;
}
public void sendRemaininBuffer(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException { public void sendRemaininBuffer(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (bufferRemainingSize > 0) { if (bufferRemainingSize > 0) {
doSendWithRetries(fd, bufferRemainingOffset, bufferRemainingSize); doSendWithRetries(fd, bufferRemainingOffset, bufferRemainingSize);
...@@ -190,7 +210,7 @@ public class WireParser implements Closeable { ...@@ -190,7 +210,7 @@ public class WireParser implements Closeable {
} }
private void dumpBuffer(char direction, long buffer, int len) { private void dumpBuffer(char direction, long buffer, int len) {
if (dumpNetworkTraffic) { if (dumpNetworkTraffic && len > 0) {
StdoutSink.INSTANCE.put(direction); StdoutSink.INSTANCE.put(direction);
dump(buffer, len); dump(buffer, len);
} }
......
...@@ -30,6 +30,8 @@ public interface WireParserConfiguration { ...@@ -30,6 +30,8 @@ public interface WireParserConfiguration {
return false; return false;
} }
int getIdleRecvCountBeforeGivingUp();
NetworkFacade getNetworkFacade(); NetworkFacade getNetworkFacade();
int getRecvBufferSize(); int getRecvBufferSize();
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.network;
public class PeerIsSlowToWriteException extends Exception {
public static final PeerIsSlowToWriteException INSTANCE = new PeerIsSlowToWriteException();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册