提交 c93a85e1 编写于 作者: V Vlad Ilyushchenko

net: very raw draft of PostgreSQL wire protocol impl

上级 c1b7d531
......@@ -252,5 +252,12 @@
<version>2.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -23,6 +23,8 @@
package com.questdb.cutlass.http;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.str.CharSink;
public interface HttpChunkedResponseSocket extends CharSink {
......
......@@ -25,10 +25,7 @@ package com.questdb.cutlass.http;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.IOContext;
import com.questdb.network.IODispatcher;
import com.questdb.network.IOOperation;
import com.questdb.network.NetworkFacade;
import com.questdb.network.*;
import com.questdb.std.*;
import com.questdb.std.str.DirectByteCharSequence;
......
......@@ -23,6 +23,9 @@
package com.questdb.cutlass.http;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
public interface HttpMultipartContentListener {
void onChunk(HttpRequestHeader partHeader, long lo, long hi);
......
......@@ -23,6 +23,8 @@
package com.questdb.cutlass.http;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.Mutable;
import com.questdb.std.Unsafe;
import com.questdb.std.str.DirectByteCharSequence;
......
......@@ -23,6 +23,9 @@
package com.questdb.cutlass.http;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
public interface HttpRawSocket {
long getBufferAddress();
......
......@@ -24,6 +24,8 @@
package com.questdb.cutlass.http;
import com.questdb.network.IODispatcher;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
public interface HttpRequestProcessor {
void onHeadersReady(HttpConnectionContext context);
......
......@@ -23,6 +23,8 @@
package com.questdb.cutlass.http;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.str.CharSink;
public interface HttpResponseHeader extends CharSink {
......
......@@ -26,6 +26,8 @@ package com.questdb.cutlass.http;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.NetworkFacade;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.*;
import com.questdb.std.ex.ZLibException;
import com.questdb.std.str.AbstractCharSink;
......
......@@ -37,6 +37,8 @@ import com.questdb.log.LogFactory;
import com.questdb.log.LogRecord;
import com.questdb.network.IODispatcher;
import com.questdb.network.IOOperation;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.*;
import com.questdb.std.str.CharSink;
......
......@@ -28,6 +28,8 @@ import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.IODispatcher;
import com.questdb.network.IOOperation;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.*;
import com.questdb.std.str.FileNameExtractorCharSequence;
import com.questdb.std.str.LPSZ;
......
......@@ -25,9 +25,13 @@ package com.questdb.cutlass.http.processors;
import com.questdb.cairo.CairoEngine;
import com.questdb.cairo.TableUtils;
import com.questdb.cutlass.http.*;
import com.questdb.cutlass.http.HttpChunkedResponseSocket;
import com.questdb.cutlass.http.HttpConnectionContext;
import com.questdb.cutlass.http.HttpRequestProcessor;
import com.questdb.network.IODispatcher;
import com.questdb.network.IOOperation;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.Chars;
import com.questdb.std.Misc;
import com.questdb.std.str.Path;
......
......@@ -34,6 +34,8 @@ import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.IODispatcher;
import com.questdb.network.IOOperation;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.*;
import com.questdb.std.str.CharSink;
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire;
import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader;
import com.questdb.cutlass.pgwire.codecs.in.StartupMessage;
import com.questdb.cutlass.pgwire.codecs.out.AuthenticationMsg;
import com.questdb.cutlass.pgwire.codecs.out.ParameterStatusMsg;
import com.questdb.cutlass.pgwire.codecs.out.ReadyForQueryMsg;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.Net;
import com.questdb.network.NetworkFacade;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.Unsafe;
import com.questdb.std.str.DirectByteCharSequence;
import java.io.Closeable;
public class WireParser implements Closeable {
private final static Log LOG = LogFactory.getLog(WireParser.class);
private int state = 0;
private final NetworkFacade nf;
private final long recvBuffer;
private final long sendBuffer;
private final int recvBufferSize;
private final int sendBufferSize;
private long recvBufferOffset = 0;
public WireParser(WireParserConfiguration configuration) {
this.nf = configuration.getNetworkFacade();
this.recvBufferSize = configuration.getRecvBufferSize();
this.recvBuffer = Unsafe.malloc(this.recvBufferSize);
this.sendBufferSize = configuration.getSendBufferSize();
this.sendBuffer = Unsafe.malloc(this.sendBufferSize);
}
@Override
public void close() {
Unsafe.free(sendBuffer, sendBufferSize);
Unsafe.free(recvBuffer, recvBufferSize);
}
public void recv(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException {
final int remaining = (int) (recvBufferSize - recvBufferOffset);
if (remaining < 1) {
throw new RuntimeException("buffer overflow");
}
final int n = Net.recv(fd, recvBuffer + recvBufferOffset, remaining);
if (n == -1) {
throw PeerDisconnectedException.INSTANCE;
}
if (n == 0) {
// todo: stay in tight loop for a bit before giving up
// todo: this exception is misplaced - peer is writing here
throw PeerIsSlowToReadException.INSTANCE;
}
if (parse(fd, recvBuffer, n)) {
recvBufferOffset = 0;
} else {
recvBufferOffset += n;
}
}
private void disconnectClient(long fd) {
nf.close(fd);
}
private void executeParseAndSendResult(long fd, CharSequence query) {
long offset;
// send 'ParseComplete'
AuthenticationMsg.setType(sendBuffer, (byte) '1');
AuthenticationMsg.setLen(sendBuffer, 4);
offset = 5;
// Net.send(clientFd, sendBuffer, 5);
// send 'ReadyForQuery'
ReadyForQueryMsg.setType(sendBuffer + offset, (byte) 'Z');
ReadyForQueryMsg.setLen(sendBuffer + offset, 5);
ReadyForQueryMsg.setStatus(sendBuffer + offset, (byte) 'I');
Net.send(fd, sendBuffer, (int) (offset + 6));
}
/**
* returns address of where parsing stopped. If there are remaining bytes left
* int the buffer they need to be passed again in parse function along with
* any additional bytes received
*
* @param address
* @param len
* @return
*/
private boolean parse(long fd, long address, int len) {
long limit = address + len;
int msgLen;
long msgLimit;
final int remaining = (int) (limit - address);
switch (state) {
case 0:
// expect startup request
if (remaining < 4) {
return false;
}
// there is data for length
// this is quite specific to message type :(
msgLen = StartupMessage.getLen(address); // postgesql includes length bytes in length of message
// do we have the rest of the message?
if (msgLen > remaining) {
// we have length - get the rest when ready
return false;
}
// 'StartupMessage'
// consume message
// process protocol
int protocol = StartupMessage.getProtocol(address);
// todo: validate protocol, see 'NegotiateProtocolVersion'
// extract properties
msgLimit = address + msgLen;
long lo = address + 8; // 8 is offset where name value pairs begin
// there is an extra byte at the end and it has to be 0
while (lo < msgLimit - 1) {
long hi = getStringLength(lo, msgLimit);
// todo: close connection when protocol is broken
assert hi > -1;
CharSequence name = new DirectByteCharSequence().of(lo, hi);
// name is ready
lo = hi + 1;
hi = getStringLength(lo, msgLimit);
assert hi > -1;
CharSequence value = new DirectByteCharSequence().of(lo, hi);
lo = hi + 1;
LOG.info()
.$("protocol [major=").$(protocol >> 16)
.$(", minor=").$((short) protocol)
.$(", name=").$(name)
.$(", value=").$(value)
.$(']').$();
}
// todo: close connection if protocol is violated
assert Unsafe.getUnsafe().getByte(lo) == 0;
// todo: check that there is no more data sent
assert lo + 1 == limit;
sendClearTextPasswordChallenge(fd);
state = 3;
return true;
case 3:
// this is a type-prefixed message
// we will wait until we receive the entire header
if (remaining < AbstractTypePrefixedHeader.LEN) {
// we need to be able to read header and length
return false;
}
msgLen = AbstractTypePrefixedHeader.getLen(address);
// msgLen does not take into account type byte
if (msgLen > remaining - 1) {
return false;
}
final byte type = AbstractTypePrefixedHeader.getType(address);
LOG.info().$("got msg '").$((char) type).$('\'').$();
msgLimit = address + msgLen + 1;
lo = address + AbstractTypePrefixedHeader.LEN; // 8 is offset where name value pairs begin
switch (type) {
case 'p':
// +1 is 'type' byte that message length does not account for
long hi = getStringLength(lo, msgLimit);
CharSequence password = new DirectByteCharSequence().of(lo, hi);
LOG.info().$("password=").$(password).$();
// todo: check that this is all client sent
assert limit == msgLimit;
// send login ok
sendLoginOk(fd);
break;
case 'P':
// 'Parse'
// this appears to be the execution side - we must at least return 'RowDescription'
// possibly more, check QueryExecutionImpl.processResults() in PG driver for more info
hi = getStringLength(lo, msgLimit);
CharSequence preparedStatementName = new DirectByteCharSequence().of(lo, hi);
LOG.info().$("prepared statement name: ").$(preparedStatementName).$();
lo = hi + 1;
hi = getStringLength(lo, msgLimit);
CharSequence query = new DirectByteCharSequence().of(lo, hi);
LOG.info().$("query: ").$(query).$();
// todo: read parameter information
executeParseAndSendResult(fd, query);
break;
case 'X':
// 'Terminate'
disconnectClient(fd);
state = 0;
break;
}
}
return true;
}
private void sendClearTextPasswordChallenge(long fd) {
AuthenticationMsg.setType(sendBuffer, (byte) 'R');
AuthenticationMsg.setLen(sendBuffer, 8);
AuthenticationMsg.setResponseCode(sendBuffer, 3);
Net.send(fd, sendBuffer, 9);
// todo: deal with incomplete send
}
private void sendLoginOk(long fd) {
// send login ok
// send authentication challenge
AuthenticationMsg.setType(sendBuffer, (byte) 'R');
AuthenticationMsg.setLen(sendBuffer, 8);
AuthenticationMsg.setResponseCode(sendBuffer, 0);
// Net.send(clientFd, sendBuffer, 9);
// length so far 9
// send 'ParameterStatus'
long offset = 9;
offset += ParameterStatusMsg.setParameterPair(
sendBuffer + offset,
"TimeZone", "GMT");
offset += ParameterStatusMsg.setParameterPair(
sendBuffer + offset,
"application_name", "QuestDB");
offset += ParameterStatusMsg.setParameterPair(
sendBuffer + offset,
"server_version_num", "100000");
offset += ParameterStatusMsg.setParameterPair(
sendBuffer + offset,
"integer_datetimes", "on");
// send 'ReadyForQuery'
ReadyForQueryMsg.setType(sendBuffer + offset, (byte) 'Z');
ReadyForQueryMsg.setLen(sendBuffer + offset, 5);
ReadyForQueryMsg.setStatus(sendBuffer + offset, (byte) 'I');
nf.send(fd, sendBuffer, (int) (offset + 6));
}
private long getStringLength(long x, long limit) {
// calculate length
for (long i = x; i < limit; i++) {
if (Unsafe.getUnsafe().getByte(i) == 0) {
return i;
}
}
return -1;
}
@FunctionalInterface
public interface MessageHandler {
int onMessage(int action);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire;
import com.questdb.network.NetworkFacade;
public interface WireParserConfiguration {
NetworkFacade getNetworkFacade();
int getRecvBufferSize();
int getSendBufferSize();
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs;
import com.questdb.std.Unsafe;
public abstract class AbstractTypePrefixedHeader {
public static final int LEN = 5;
public static int getLen(long address) {
return NetworkByteOrderUtils.getInt(address + 1);
}
public static byte getType(long address) {
return Unsafe.getUnsafe().getByte(address);
}
public static void setLen(long address, int len) {
NetworkByteOrderUtils.putInt(address + 1, len);
}
public static void setType(long address, byte type) {
Unsafe.getUnsafe().putByte(address, type);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs;
public final class Constants {
public static final int AUTHENTICATION_OK = 0;
public static final int AUTHENTICATION_KRBv5 = 2;
public static final int AUTHENTICATION_CLEARTEXT_PASSWORD = 3;
public static final int AUTHENTICATION_MD5_PASSWORD = 5;
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs;
import com.questdb.std.Unsafe;
public class NetworkByteOrderUtils {
public static int getInt(long address) {
int b = Unsafe.getUnsafe().getByte(address);
b = (b << 8) | Unsafe.getUnsafe().getByte(address + 1);
b = (b << 8) | Unsafe.getUnsafe().getByte(address + 2);
return (b << 8) | Unsafe.getUnsafe().getByte(address + 3);
}
public static void putInt(long address, int value) {
Unsafe.getUnsafe().putByte(address, (byte) (value >>> 24));
Unsafe.getUnsafe().putByte(address + 1, (byte) (value >>> 16));
Unsafe.getUnsafe().putByte(address + 2, (byte) (value >>> 8));
Unsafe.getUnsafe().putByte(address + 3, (byte) (value));
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs.in;
import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader;
public class QueryRequest extends AbstractTypePrefixedHeader {
}
......@@ -21,23 +21,16 @@
*
******************************************************************************/
package com.questdb.network;
package com.questdb.cutlass.pgwire.codecs.in;
public final class DisconnectReason {
public static final int PEER = 1;
public static final int IDLE = 2;
public static final int SILLY = 3;
import com.questdb.cutlass.pgwire.codecs.NetworkByteOrderUtils;
public static CharSequence nameOf(int disconnectReason) {
switch (disconnectReason) {
case PEER:
return "PEER";
case IDLE:
return "IDLE";
case SILLY:
return "SILLY";
default:
return "UNKNOWN";
}
public class StartupMessage {
public static int getLen(long address) {
return NetworkByteOrderUtils.getInt(address);
}
public static int getProtocol(long address) {
return NetworkByteOrderUtils.getInt(address + 4);
}
}
......@@ -21,7 +21,8 @@
*
******************************************************************************/
package com.questdb.cutlass.http;
package com.questdb.cutlass.pgwire.codecs.out;
public class AuthenticationMD5PasswordMsg extends AuthenticationMsg {
public class HttpFlowControlException extends Exception {
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs.out;
import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader;
import com.questdb.cutlass.pgwire.codecs.NetworkByteOrderUtils;
public class AuthenticationMsg extends AbstractTypePrefixedHeader {
public static void setResponseCode(long address, int status) {
NetworkByteOrderUtils.putInt(address + AbstractTypePrefixedHeader.LEN, status);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs.out;
import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader;
import com.questdb.std.Unsafe;
public class ParameterStatusMsg extends AbstractTypePrefixedHeader {
public static int setParameterPair(long address, CharSequence name, CharSequence value) {
setType(address, (byte) 'S');
long p = address + AbstractTypePrefixedHeader.LEN;
final long start = p;
p = copyStringZ(p, name);
p = copyStringZ(p, value);
int len = (int) (p - start);
setLen(address, len + AbstractTypePrefixedHeader.LEN - 1);
return len + AbstractTypePrefixedHeader.LEN;
}
private static long copyStringZ(long p, CharSequence value) {
for (int i = 0, m = value.length(); i < m; i++) {
Unsafe.getUnsafe().putByte(p++, (byte) value.charAt(i));
}
Unsafe.getUnsafe().putByte(p, (byte) 0);
return p + 1;
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs.out;
import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader;
import com.questdb.std.Unsafe;
public class ReadyForQueryMsg extends AbstractTypePrefixedHeader {
public static void setStatus(long address, byte status) {
Unsafe.getUnsafe().putByte(address + AbstractTypePrefixedHeader.LEN, status);
}
}
......@@ -21,8 +21,8 @@
*
******************************************************************************/
package com.questdb.cutlass.http;
package com.questdb.network;
public class PeerDisconnectedException extends HttpFlowControlException {
public class PeerDisconnectedException extends Exception {
public static final PeerDisconnectedException INSTANCE = new PeerDisconnectedException();
}
......@@ -21,8 +21,8 @@
*
******************************************************************************/
package com.questdb.cutlass.http;
package com.questdb.network;
public class PeerIsSlowToReadException extends HttpFlowControlException {
public class PeerIsSlowToReadException extends Exception {
public static final PeerIsSlowToReadException INSTANCE = new PeerIsSlowToReadException();
}
......@@ -44,9 +44,8 @@ import com.questdb.std.str.StringSink;
import com.questdb.std.time.MillisecondClock;
import com.questdb.test.tools.TestUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.*;
import org.junit.rules.TemporaryFolder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -56,6 +55,14 @@ import java.util.concurrent.locks.LockSupport;
public class IODispatcherTest {
private static Log LOG = LogFactory.getLog(IODispatcherTest.class);
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@Before
public void setUp() throws Exception {
temp.create();
}
@Test
public void testBiasWrite() throws Exception {
......@@ -239,7 +246,7 @@ public class IODispatcherTest {
@Test
public void testImportMultipleOnSameConnection() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (CairoEngine engine = new CairoEngine(new DefaultCairoConfiguration(baseDir));
......@@ -402,7 +409,7 @@ public class IODispatcherTest {
@Test
public void testImportMultipleOnSameConnectionFragmented() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (CairoEngine engine = new CairoEngine(new DefaultCairoConfiguration(baseDir));
......@@ -540,7 +547,7 @@ public class IODispatcherTest {
@Test
public void testImportMultipleOnSameConnectionSlow() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (
......@@ -687,7 +694,7 @@ public class IODispatcherTest {
@Test
public void testJsonQuery() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (
......@@ -769,7 +776,7 @@ public class IODispatcherTest {
TestUtils.assertMemoryLeak(() -> {
final NetworkFacade nf = NetworkFacadeImpl.INSTANCE;
final String baseDir = System.getProperty("java.io.tmpdir");
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(nf, baseDir, 128);
try (CairoEngine engine = new CairoEngine(new DefaultCairoConfiguration(baseDir));
......@@ -912,7 +919,7 @@ public class IODispatcherTest {
@Test
public void testJsonQuerySyntaxError() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (
......@@ -1087,7 +1094,7 @@ public class IODispatcherTest {
@Test
public void testSCPConnectDownloadDisconnect() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (HttpServer httpServer = new HttpServer(httpConfiguration)) {
......@@ -1266,7 +1273,7 @@ public class IODispatcherTest {
@Test
public void testSCPFullDownload() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (HttpServer httpServer = new HttpServer(httpConfiguration)) {
......@@ -2097,7 +2104,7 @@ public class IODispatcherTest {
@Ignore
public void testUpload() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final String baseDir = temp.getRoot().getAbsolutePath();
// final String baseDir = "/home/vlad/dev/123";
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire;
import com.questdb.network.*;
import com.questdb.std.Os;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
public class WireParserTest {
@Test
public void testSimple() throws SQLException {
// start simple server
long fd = Net.socketTcp(true);
WireParser wireParser = new WireParser(new WireParserConfiguration() {
@Override
public NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
@Override
public int getRecvBufferSize() {
return 1024 * 1024;
}
@Override
public int getSendBufferSize() {
return 1024 * 1024;
}
});
Net.setReusePort(fd);
if (Net.bindTcp(fd, 0, 9120)) {
Net.listen(fd, 128);
new Thread(() -> {
final long clientFd = Net.accept(fd);
while (true) {
try {
wireParser.recv(clientFd);
} catch (PeerDisconnectedException e) {
break;
} catch (PeerIsSlowToReadException ignored) {
}
}
}).start();
Properties properties = new Properties();
properties.setProperty("user", "xyz");
properties.setProperty("password", "oh");
properties.setProperty("sslmode", "disable");
final Connection connection = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:9120/nabu_app", properties);
// Statement statement = connection.createStatement();
// statement.executeQuery("select * from tab");
connection.close();
} else {
throw NetworkError.instance(Os.errno()).couldNotBindSocket();
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册