提交 3c72f787 编写于 作者: B bluestreak02

PG progress

上级 ebc2b31d
......@@ -346,7 +346,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
@Override
public void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
this.transientContext = context;
this.transientDispatcher = dispatcher;
this.transientState = LV.get(context);
......
......@@ -59,7 +59,6 @@ class TextImportProcessorState implements Mutable, Closeable {
new DateFormatFactory(),
com.questdb.std.microtime.DateLocaleFactory.INSTANCE,
new com.questdb.std.microtime.DateFormatFactory()
);
}
......
......@@ -23,18 +23,29 @@
package com.questdb.cutlass.pgwire;
import com.questdb.cairo.CairoEngine;
import com.questdb.cairo.ColumnType;
import com.questdb.cairo.sql.Record;
import com.questdb.cairo.sql.RecordCursor;
import com.questdb.cairo.sql.RecordCursorFactory;
import com.questdb.cairo.sql.RecordMetadata;
import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader;
import com.questdb.cutlass.pgwire.codecs.NetworkByteOrderUtils;
import com.questdb.cutlass.pgwire.codecs.in.StartupMessage;
import com.questdb.cutlass.pgwire.codecs.out.AuthenticationMsg;
import com.questdb.cutlass.pgwire.codecs.out.ParameterStatusMsg;
import com.questdb.cutlass.pgwire.codecs.out.ReadyForQueryMsg;
import com.questdb.griffin.SqlCompiler;
import com.questdb.griffin.SqlException;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.Net;
import com.questdb.network.NetworkFacade;
import com.questdb.network.PeerDisconnectedException;
import com.questdb.network.PeerIsSlowToReadException;
import com.questdb.std.Chars;
import com.questdb.std.IntIntHashMap;
import com.questdb.std.Misc;
import com.questdb.std.Unsafe;
import com.questdb.std.str.AbstractCharSink;
import com.questdb.std.str.CharSink;
import com.questdb.std.str.DirectByteCharSequence;
import java.io.Closeable;
......@@ -42,27 +53,43 @@ import java.io.Closeable;
public class WireParser implements Closeable {
private final static Log LOG = LogFactory.getLog(WireParser.class);
private static final IntIntHashMap typeOidMap = new IntIntHashMap();
static {
typeOidMap.put(ColumnType.STRING, 1043); // VARCHAR
typeOidMap.put(ColumnType.TIMESTAMP, 1184); // TIMESTAMPZ
typeOidMap.put(ColumnType.DOUBLE, 701); // FLOAT8
typeOidMap.put(ColumnType.FLOAT, 700);
typeOidMap.put(ColumnType.INT, 23); // INT4
typeOidMap.put(ColumnType.SHORT, 21); // INT2
}
private int state = 0;
private final NetworkFacade nf;
private final long recvBuffer;
private final long sendBuffer;
private final int recvBufferSize;
private final int sendBufferSize;
private final SqlCompiler compiler;
private final ResponseAsciiSink responseAsciiSink = new ResponseAsciiSink();
private long sendBufferPtr;
private int state = 0;
private long recvBufferOffset = 0;
public WireParser(WireParserConfiguration configuration) {
public WireParser(WireParserConfiguration configuration, CairoEngine engine) {
this.nf = configuration.getNetworkFacade();
this.compiler = new SqlCompiler(engine);
this.recvBufferSize = configuration.getRecvBufferSize();
this.recvBuffer = Unsafe.malloc(this.recvBufferSize);
this.sendBufferSize = configuration.getSendBufferSize();
this.sendBuffer = Unsafe.malloc(this.sendBufferSize);
this.sendBufferPtr = sendBuffer;
}
@Override
public void close() {
Unsafe.free(sendBuffer, sendBufferSize);
Unsafe.free(recvBuffer, recvBufferSize);
Misc.free(compiler);
}
public void recv(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException {
......@@ -73,7 +100,7 @@ public class WireParser implements Closeable {
}
final int n = Net.recv(fd, recvBuffer + recvBufferOffset, remaining);
if (n == -1) {
if (n < 0) {
throw PeerDisconnectedException.INSTANCE;
}
......@@ -95,30 +122,120 @@ public class WireParser implements Closeable {
}
private void executeParseAndSendResult(long fd, CharSequence query) {
long offset;
// send 'ParseComplete'
AuthenticationMsg.setType(sendBuffer, (byte) '1');
AuthenticationMsg.setLen(sendBuffer, 4);
responseAsciiSink.reset();
if (Chars.startsWith(query, "SET")) {
prepareParseComplete();
prepareReadyForQuery();
send(fd);
} else {
try {
try (RecordCursorFactory factory = compiler.compile(query)) {
if (factory != null) {
prepareRowDescription(factory.getMetadata());
RecordCursor cursor = factory.getCursor();
Record record = cursor.getRecord();
RecordMetadata metadata = factory.getMetadata();
final int columnCount = metadata.getColumnCount();
while (cursor.hasNext()) {
responseAsciiSink.put('D'); // data
long b = responseAsciiSink.skip();
long a;
responseAsciiSink.putNetworkShort((short) columnCount);
for (int i = 0; i < columnCount; i++) {
switch (metadata.getColumnType(i)) {
case ColumnType.INT:
a = responseAsciiSink.skip();
responseAsciiSink.put(record.getInt(i));
responseAsciiSink.putLenEx(a);
break;
case ColumnType.STRING:
responseAsciiSink.putNetworkInt(record.getStrLen(i));
responseAsciiSink.encodeUtf8(record.getStr(i));
break;
case ColumnType.TIMESTAMP:
responseAsciiSink.putNetworkInt(Long.BYTES);
responseAsciiSink.put(record.getTimestamp(i));
break;
case ColumnType.DOUBLE:
responseAsciiSink.putNetworkInt(Double.BYTES);
responseAsciiSink.put(record.getDouble(i), 3);
break;
default:
assert false;
}
}
responseAsciiSink.putLen(b);
}
offset = 5;
// Net.send(clientFd, sendBuffer, 5);
prepareCommandComplete();
prepareReadyForQuery();
send(fd);
} else {
prepareParseComplete();
prepareReadyForQuery();
send(fd);
}
}
} catch (SqlException e) {
prepareError(e);
prepareReadyForQuery();
send(fd);
}
}
}
private void prepareCommandComplete() {
responseAsciiSink.put('C');
long addr = responseAsciiSink.skip();
responseAsciiSink.encodeUtf8("SELECT ").put(0).put(' ').put(0).put((char) 0);
responseAsciiSink.putLen(addr);
}
private void prepareRowDescription(RecordMetadata metadata) {
responseAsciiSink.put('T');
final long addr = responseAsciiSink.skip();
final int n = metadata.getColumnCount();
responseAsciiSink.putNetworkShort((short) n);
for (int i = 0; i < n; i++) {
responseAsciiSink.encodeUtf8Z(metadata.getColumnName(i));
responseAsciiSink.putNetworkInt(0);
responseAsciiSink.putNetworkShort((short) 0);
responseAsciiSink.putNetworkInt(typeOidMap.get(metadata.getColumnType(i))); // type
responseAsciiSink.putNetworkShort((short) 0); // type size?
responseAsciiSink.putNetworkInt(0); // type mod?
responseAsciiSink.putNetworkShort((short) 0); // format code
}
responseAsciiSink.putLen(addr);
}
private void prepareNoData() {
responseAsciiSink.put('n');
responseAsciiSink.putNetworkInt(Integer.BYTES);
}
// send 'ReadyForQuery'
ReadyForQueryMsg.setType(sendBuffer + offset, (byte) 'Z');
ReadyForQueryMsg.setLen(sendBuffer + offset, 5);
ReadyForQueryMsg.setStatus(sendBuffer + offset, (byte) 'I');
Net.send(fd, sendBuffer, (int) (offset + 6));
private void prepareError(SqlException e) {
responseAsciiSink.put('E');
long addr = responseAsciiSink.skip();
responseAsciiSink.put('M');
responseAsciiSink.encodeUtf8Z(e.getFlyweightMessage());
responseAsciiSink.put('S');
responseAsciiSink.encodeUtf8Z("ERROR");
responseAsciiSink.put('P').put(e.getPosition()).put((char) 0);
responseAsciiSink.putLen(addr);
}
private void prepareParseComplete() {
responseAsciiSink.put('1');
responseAsciiSink.putNetworkInt(Integer.BYTES);
}
/**
* returns address of where parsing stopped. If there are remaining bytes left
* int the buffer they need to be passed again in parse function along with
* any additional bytes received
*
* @param address
* @param len
* @return
*/
private boolean parse(long fd, long address, int len) {
long limit = address + len;
......@@ -200,6 +317,7 @@ public class WireParser implements Closeable {
}
msgLen = AbstractTypePrefixedHeader.getLen(address);
assert msgLen > 0;
// msgLen does not take into account type byte
if (msgLen > remaining - 1) {
......@@ -235,6 +353,7 @@ public class WireParser implements Closeable {
// possibly more, check QueryExecutionImpl.processResults() in PG driver for more info
hi = getStringLength(lo, msgLimit);
assert hi >= lo;
CharSequence preparedStatementName = new DirectByteCharSequence().of(lo, hi);
LOG.info().$("prepared statement name: ").$(preparedStatementName).$();
......@@ -262,45 +381,48 @@ public class WireParser implements Closeable {
}
private void sendClearTextPasswordChallenge(long fd) {
AuthenticationMsg.setType(sendBuffer, (byte) 'R');
AuthenticationMsg.setLen(sendBuffer, 8);
AuthenticationMsg.setResponseCode(sendBuffer, 3);
Net.send(fd, sendBuffer, 9);
// todo: deal with incomplete send
responseAsciiSink.reset();
responseAsciiSink.put('R');
responseAsciiSink.putNetworkInt(8);
responseAsciiSink.putNetworkInt(3);
send(fd);
}
private void sendLoginOk(long fd) {
// send login ok
// send authentication challenge
AuthenticationMsg.setType(sendBuffer, (byte) 'R');
AuthenticationMsg.setLen(sendBuffer, 8);
AuthenticationMsg.setResponseCode(sendBuffer, 0);
// Net.send(clientFd, sendBuffer, 9);
// length so far 9
// send 'ParameterStatus'
long offset = 9;
offset += ParameterStatusMsg.setParameterPair(
sendBuffer + offset,
"TimeZone", "GMT");
offset += ParameterStatusMsg.setParameterPair(
sendBuffer + offset,
"application_name", "QuestDB");
offset += ParameterStatusMsg.setParameterPair(
sendBuffer + offset,
"server_version_num", "100000");
offset += ParameterStatusMsg.setParameterPair(
sendBuffer + offset,
"integer_datetimes", "on");
// send 'ReadyForQuery'
ReadyForQueryMsg.setType(sendBuffer + offset, (byte) 'Z');
ReadyForQueryMsg.setLen(sendBuffer + offset, 5);
ReadyForQueryMsg.setStatus(sendBuffer + offset, (byte) 'I');
nf.send(fd, sendBuffer, (int) (offset + 6));
responseAsciiSink.reset();
prepareLoginOk();
prepareParams("TimeZone", "GMT");
prepareParams("application_name", "QuestDB");
prepareParams("server_version_num", "100000");
prepareParams("integer_datetimes", "on");
prepareReadyForQuery();
send(fd);
}
private void send(long fd) {
// todo: deal with incomplete send
int n = nf.send(fd, sendBuffer, (int) (sendBufferPtr - sendBuffer));
LOG.info().$("sent [n=").$(n).$(']').$();
}
private void prepareLoginOk() {
responseAsciiSink.put('R');
responseAsciiSink.putNetworkInt(8); // length of this message
responseAsciiSink.putNetworkInt(0); // response code
}
private void prepareReadyForQuery() {
responseAsciiSink.put('Z');
responseAsciiSink.putNetworkInt(5);
responseAsciiSink.put('I');
}
private void prepareParams(String timeZone, String gmt) {
responseAsciiSink.put('S');
final long addr = responseAsciiSink.skip();
responseAsciiSink.encodeUtf8Z(timeZone);
responseAsciiSink.encodeUtf8Z(gmt);
responseAsciiSink.putLen(addr);
}
private long getStringLength(long x, long limit) {
......@@ -313,8 +435,53 @@ public class WireParser implements Closeable {
return -1;
}
@FunctionalInterface
public interface MessageHandler {
int onMessage(int action);
private class ResponseAsciiSink extends AbstractCharSink {
@Override
public CharSink put(CharSequence cs, int start, int end) {
for (int i = start; i < end; i++) {
Unsafe.getUnsafe().putByte(sendBufferPtr + i, (byte) cs.charAt(i));
}
sendBufferPtr += (end - start);
return this;
}
@Override
public CharSink put(char c) {
Unsafe.getUnsafe().putByte(sendBufferPtr++, (byte) c);
return this;
}
public void putNetworkInt(int len) {
NetworkByteOrderUtils.putInt(sendBufferPtr, len);
sendBufferPtr += Integer.BYTES;
}
public void putNetworkShort(short value) {
NetworkByteOrderUtils.putShort(sendBufferPtr, value);
sendBufferPtr += Short.BYTES;
}
public void putLen(long start) {
NetworkByteOrderUtils.putInt(start, (int) (sendBufferPtr - start));
}
public void putLenEx(long start) {
NetworkByteOrderUtils.putInt(start, (int) (sendBufferPtr - start - Integer.BYTES));
}
long skip() {
long checkpoint = sendBufferPtr;
sendBufferPtr += Integer.BYTES;
return checkpoint;
}
void reset() {
sendBufferPtr = sendBuffer;
}
void encodeUtf8Z(CharSequence value) {
encodeUtf8(value);
Unsafe.getUnsafe().putByte(sendBufferPtr++, (byte) 0);
}
}
}
......@@ -27,10 +27,10 @@ import com.questdb.std.Unsafe;
public class NetworkByteOrderUtils {
public static int getInt(long address) {
int b = Unsafe.getUnsafe().getByte(address);
b = (b << 8) | Unsafe.getUnsafe().getByte(address + 1);
b = (b << 8) | Unsafe.getUnsafe().getByte(address + 2);
return (b << 8) | Unsafe.getUnsafe().getByte(address + 3);
int b = Unsafe.getUnsafe().getByte(address) & 0xff;
b = (b << 8) | Unsafe.getUnsafe().getByte(address + 1) & 0xff;
b = (b << 8) | Unsafe.getUnsafe().getByte(address + 2) & 0xff;
return (b << 8) | Unsafe.getUnsafe().getByte(address + 3) & 0xff;
}
public static void putInt(long address, int value) {
......@@ -39,4 +39,9 @@ public class NetworkByteOrderUtils {
Unsafe.getUnsafe().putByte(address + 2, (byte) (value >>> 8));
Unsafe.getUnsafe().putByte(address + 3, (byte) (value));
}
public static void putShort(long address, short value) {
Unsafe.getUnsafe().putByte(address, (byte) (value >>> 8));
Unsafe.getUnsafe().putByte(address + 1, (byte) (value));
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs.out;
public class AuthenticationMD5PasswordMsg extends AuthenticationMsg {
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs.out;
import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader;
import com.questdb.cutlass.pgwire.codecs.NetworkByteOrderUtils;
public class AuthenticationMsg extends AbstractTypePrefixedHeader {
public static void setResponseCode(long address, int status) {
NetworkByteOrderUtils.putInt(address + AbstractTypePrefixedHeader.LEN, status);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs.out;
import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader;
import com.questdb.std.Unsafe;
public class ParameterStatusMsg extends AbstractTypePrefixedHeader {
public static int setParameterPair(long address, CharSequence name, CharSequence value) {
setType(address, (byte) 'S');
long p = address + AbstractTypePrefixedHeader.LEN;
final long start = p;
p = copyStringZ(p, name);
p = copyStringZ(p, value);
int len = (int) (p - start);
setLen(address, len + AbstractTypePrefixedHeader.LEN - 1);
return len + AbstractTypePrefixedHeader.LEN;
}
private static long copyStringZ(long p, CharSequence value) {
for (int i = 0, m = value.length(); i < m; i++) {
Unsafe.getUnsafe().putByte(p++, (byte) value.charAt(i));
}
Unsafe.getUnsafe().putByte(p, (byte) 0);
return p + 1;
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.pgwire.codecs.out;
import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader;
import com.questdb.std.Unsafe;
public class ReadyForQueryMsg extends AbstractTypePrefixedHeader {
public static void setStatus(long address, byte status) {
Unsafe.getUnsafe().putByte(address + AbstractTypePrefixedHeader.LEN, status);
}
}
......@@ -23,16 +23,15 @@
package com.questdb.cutlass.pgwire;
import com.questdb.griffin.AbstractGriffinTest;
import com.questdb.network.*;
import com.questdb.std.Os;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.*;
import java.util.Properties;
public class WireParserTest {
public class WireParserTest extends AbstractGriffinTest {
@Test
public void testSimple() throws SQLException {
......@@ -55,8 +54,7 @@ public class WireParserTest {
public int getSendBufferSize() {
return 1024 * 1024;
}
});
}, engine);
Net.setReusePort(fd);
......@@ -81,10 +79,16 @@ public class WireParserTest {
properties.setProperty("sslmode", "disable");
final Connection connection = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:9120/nabu_app", properties);
// Statement statement = connection.createStatement();
// statement.executeQuery("select * from tab");
Statement statement = connection.createStatement();
// ResultSet rs = statement.executeQuery("select rnd_str(4,4,0) s, timestamp_sequence(to_timestamp(0),10000) t, rnd_double(0) d, rnd_int() i from long_sequence(4000)2");
ResultSet rs = statement.executeQuery("select rnd_str(4,4,0) s, rnd_int() i from long_sequence(5)");
while (rs.next()) {
System.out.print(rs.getString(1));
System.out.print(",");
System.out.print(rs.getInt(2));
System.out.println();
}
connection.close();
} else {
throw NetworkError.instance(Os.errno()).couldNotBindSocket();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册