diff --git a/core/src/main/java/com/questdb/cutlass/http/processors/TextImportProcessor.java b/core/src/main/java/com/questdb/cutlass/http/processors/TextImportProcessor.java index 87f73102f0f64bcbfb11c5bcbe76061179931d68..7628ce50a806911c675e0d4dc21a65848120834d 100644 --- a/core/src/main/java/com/questdb/cutlass/http/processors/TextImportProcessor.java +++ b/core/src/main/java/com/questdb/cutlass/http/processors/TextImportProcessor.java @@ -346,7 +346,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC @Override public void resumeRecv(HttpConnectionContext context, IODispatcher dispatcher) { - this.transientContext = context; this.transientDispatcher = dispatcher; this.transientState = LV.get(context); diff --git a/core/src/main/java/com/questdb/cutlass/http/processors/TextImportProcessorState.java b/core/src/main/java/com/questdb/cutlass/http/processors/TextImportProcessorState.java index 7ad13e7286dba4c1befc6b47ec60f78a79915663..712ee88f37eb5d3722ffc7241e4b8ad2bf704a3b 100644 --- a/core/src/main/java/com/questdb/cutlass/http/processors/TextImportProcessorState.java +++ b/core/src/main/java/com/questdb/cutlass/http/processors/TextImportProcessorState.java @@ -59,7 +59,6 @@ class TextImportProcessorState implements Mutable, Closeable { new DateFormatFactory(), com.questdb.std.microtime.DateLocaleFactory.INSTANCE, new com.questdb.std.microtime.DateFormatFactory() - ); } diff --git a/core/src/main/java/com/questdb/cutlass/pgwire/WireParser.java b/core/src/main/java/com/questdb/cutlass/pgwire/WireParser.java index 5323155cae4568da552caed7c326bb979c56a844..e7621826916f7c1e9e842975245733e8c0b20ce8 100644 --- a/core/src/main/java/com/questdb/cutlass/pgwire/WireParser.java +++ b/core/src/main/java/com/questdb/cutlass/pgwire/WireParser.java @@ -23,18 +23,29 @@ package com.questdb.cutlass.pgwire; +import com.questdb.cairo.CairoEngine; +import com.questdb.cairo.ColumnType; +import com.questdb.cairo.sql.Record; +import com.questdb.cairo.sql.RecordCursor; +import com.questdb.cairo.sql.RecordCursorFactory; +import com.questdb.cairo.sql.RecordMetadata; import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader; +import com.questdb.cutlass.pgwire.codecs.NetworkByteOrderUtils; import com.questdb.cutlass.pgwire.codecs.in.StartupMessage; -import com.questdb.cutlass.pgwire.codecs.out.AuthenticationMsg; -import com.questdb.cutlass.pgwire.codecs.out.ParameterStatusMsg; -import com.questdb.cutlass.pgwire.codecs.out.ReadyForQueryMsg; +import com.questdb.griffin.SqlCompiler; +import com.questdb.griffin.SqlException; import com.questdb.log.Log; import com.questdb.log.LogFactory; import com.questdb.network.Net; import com.questdb.network.NetworkFacade; import com.questdb.network.PeerDisconnectedException; import com.questdb.network.PeerIsSlowToReadException; +import com.questdb.std.Chars; +import com.questdb.std.IntIntHashMap; +import com.questdb.std.Misc; import com.questdb.std.Unsafe; +import com.questdb.std.str.AbstractCharSink; +import com.questdb.std.str.CharSink; import com.questdb.std.str.DirectByteCharSequence; import java.io.Closeable; @@ -42,27 +53,43 @@ import java.io.Closeable; public class WireParser implements Closeable { private final static Log LOG = LogFactory.getLog(WireParser.class); + private static final IntIntHashMap typeOidMap = new IntIntHashMap(); + + static { + typeOidMap.put(ColumnType.STRING, 1043); // VARCHAR + typeOidMap.put(ColumnType.TIMESTAMP, 1184); // TIMESTAMPZ + typeOidMap.put(ColumnType.DOUBLE, 701); // FLOAT8 + typeOidMap.put(ColumnType.FLOAT, 700); + typeOidMap.put(ColumnType.INT, 23); // INT4 + typeOidMap.put(ColumnType.SHORT, 21); // INT2 + } - private int state = 0; private final NetworkFacade nf; private final long recvBuffer; private final long sendBuffer; private final int recvBufferSize; private final int sendBufferSize; + private final SqlCompiler compiler; + private final ResponseAsciiSink responseAsciiSink = new ResponseAsciiSink(); + private long sendBufferPtr; + private int state = 0; private long recvBufferOffset = 0; - public WireParser(WireParserConfiguration configuration) { + public WireParser(WireParserConfiguration configuration, CairoEngine engine) { this.nf = configuration.getNetworkFacade(); + this.compiler = new SqlCompiler(engine); this.recvBufferSize = configuration.getRecvBufferSize(); this.recvBuffer = Unsafe.malloc(this.recvBufferSize); this.sendBufferSize = configuration.getSendBufferSize(); this.sendBuffer = Unsafe.malloc(this.sendBufferSize); + this.sendBufferPtr = sendBuffer; } @Override public void close() { Unsafe.free(sendBuffer, sendBufferSize); Unsafe.free(recvBuffer, recvBufferSize); + Misc.free(compiler); } public void recv(long fd) throws PeerDisconnectedException, PeerIsSlowToReadException { @@ -73,7 +100,7 @@ public class WireParser implements Closeable { } final int n = Net.recv(fd, recvBuffer + recvBufferOffset, remaining); - if (n == -1) { + if (n < 0) { throw PeerDisconnectedException.INSTANCE; } @@ -95,30 +122,120 @@ public class WireParser implements Closeable { } private void executeParseAndSendResult(long fd, CharSequence query) { - long offset; - // send 'ParseComplete' - AuthenticationMsg.setType(sendBuffer, (byte) '1'); - AuthenticationMsg.setLen(sendBuffer, 4); + responseAsciiSink.reset(); + if (Chars.startsWith(query, "SET")) { + prepareParseComplete(); + prepareReadyForQuery(); + send(fd); + } else { + try { + try (RecordCursorFactory factory = compiler.compile(query)) { + if (factory != null) { + prepareRowDescription(factory.getMetadata()); + + RecordCursor cursor = factory.getCursor(); + Record record = cursor.getRecord(); + RecordMetadata metadata = factory.getMetadata(); + final int columnCount = metadata.getColumnCount(); + + while (cursor.hasNext()) { + responseAsciiSink.put('D'); // data + long b = responseAsciiSink.skip(); + long a; + responseAsciiSink.putNetworkShort((short) columnCount); + for (int i = 0; i < columnCount; i++) { + switch (metadata.getColumnType(i)) { + case ColumnType.INT: + a = responseAsciiSink.skip(); + responseAsciiSink.put(record.getInt(i)); + responseAsciiSink.putLenEx(a); + break; + case ColumnType.STRING: + responseAsciiSink.putNetworkInt(record.getStrLen(i)); + responseAsciiSink.encodeUtf8(record.getStr(i)); + break; + case ColumnType.TIMESTAMP: + responseAsciiSink.putNetworkInt(Long.BYTES); + responseAsciiSink.put(record.getTimestamp(i)); + break; + case ColumnType.DOUBLE: + responseAsciiSink.putNetworkInt(Double.BYTES); + responseAsciiSink.put(record.getDouble(i), 3); + break; + default: + assert false; + } + } + responseAsciiSink.putLen(b); + } + + prepareCommandComplete(); + prepareReadyForQuery(); + send(fd); + } else { + prepareParseComplete(); + prepareReadyForQuery(); + send(fd); + } + } + } catch (SqlException e) { + prepareError(e); + prepareReadyForQuery(); + send(fd); + } + } + } + + private void prepareCommandComplete() { + responseAsciiSink.put('C'); + long addr = responseAsciiSink.skip(); + responseAsciiSink.encodeUtf8("SELECT ").put(0).put(' ').put(0).put((char) 0); + responseAsciiSink.putLen(addr); + } + + private void prepareRowDescription(RecordMetadata metadata) { + responseAsciiSink.put('T'); + final long addr = responseAsciiSink.skip(); + final int n = metadata.getColumnCount(); + responseAsciiSink.putNetworkShort((short) n); + for (int i = 0; i < n; i++) { + responseAsciiSink.encodeUtf8Z(metadata.getColumnName(i)); + responseAsciiSink.putNetworkInt(0); + responseAsciiSink.putNetworkShort((short) 0); + responseAsciiSink.putNetworkInt(typeOidMap.get(metadata.getColumnType(i))); // type + responseAsciiSink.putNetworkShort((short) 0); // type size? + responseAsciiSink.putNetworkInt(0); // type mod? + responseAsciiSink.putNetworkShort((short) 0); // format code + } + + responseAsciiSink.putLen(addr); + } - offset = 5; -// Net.send(clientFd, sendBuffer, 5); + private void prepareNoData() { + responseAsciiSink.put('n'); + responseAsciiSink.putNetworkInt(Integer.BYTES); + } - // send 'ReadyForQuery' - ReadyForQueryMsg.setType(sendBuffer + offset, (byte) 'Z'); - ReadyForQueryMsg.setLen(sendBuffer + offset, 5); - ReadyForQueryMsg.setStatus(sendBuffer + offset, (byte) 'I'); - Net.send(fd, sendBuffer, (int) (offset + 6)); + private void prepareError(SqlException e) { + responseAsciiSink.put('E'); + long addr = responseAsciiSink.skip(); + responseAsciiSink.put('M'); + responseAsciiSink.encodeUtf8Z(e.getFlyweightMessage()); + responseAsciiSink.put('S'); + responseAsciiSink.encodeUtf8Z("ERROR"); + responseAsciiSink.put('P').put(e.getPosition()).put((char) 0); + responseAsciiSink.putLen(addr); + } + private void prepareParseComplete() { + responseAsciiSink.put('1'); + responseAsciiSink.putNetworkInt(Integer.BYTES); } /** * returns address of where parsing stopped. If there are remaining bytes left * int the buffer they need to be passed again in parse function along with * any additional bytes received - * - * @param address - * @param len - * @return */ private boolean parse(long fd, long address, int len) { long limit = address + len; @@ -200,6 +317,7 @@ public class WireParser implements Closeable { } msgLen = AbstractTypePrefixedHeader.getLen(address); + assert msgLen > 0; // msgLen does not take into account type byte if (msgLen > remaining - 1) { @@ -235,6 +353,7 @@ public class WireParser implements Closeable { // possibly more, check QueryExecutionImpl.processResults() in PG driver for more info hi = getStringLength(lo, msgLimit); + assert hi >= lo; CharSequence preparedStatementName = new DirectByteCharSequence().of(lo, hi); LOG.info().$("prepared statement name: ").$(preparedStatementName).$(); @@ -262,45 +381,48 @@ public class WireParser implements Closeable { } private void sendClearTextPasswordChallenge(long fd) { - AuthenticationMsg.setType(sendBuffer, (byte) 'R'); - AuthenticationMsg.setLen(sendBuffer, 8); - AuthenticationMsg.setResponseCode(sendBuffer, 3); - Net.send(fd, sendBuffer, 9); - // todo: deal with incomplete send + responseAsciiSink.reset(); + responseAsciiSink.put('R'); + responseAsciiSink.putNetworkInt(8); + responseAsciiSink.putNetworkInt(3); + send(fd); } private void sendLoginOk(long fd) { - // send login ok - // send authentication challenge - AuthenticationMsg.setType(sendBuffer, (byte) 'R'); - AuthenticationMsg.setLen(sendBuffer, 8); - AuthenticationMsg.setResponseCode(sendBuffer, 0); -// Net.send(clientFd, sendBuffer, 9); - // length so far 9 - - // send 'ParameterStatus' - long offset = 9; - offset += ParameterStatusMsg.setParameterPair( - sendBuffer + offset, - "TimeZone", "GMT"); - - offset += ParameterStatusMsg.setParameterPair( - sendBuffer + offset, - "application_name", "QuestDB"); - - offset += ParameterStatusMsg.setParameterPair( - sendBuffer + offset, - "server_version_num", "100000"); - - offset += ParameterStatusMsg.setParameterPair( - sendBuffer + offset, - "integer_datetimes", "on"); - - // send 'ReadyForQuery' - ReadyForQueryMsg.setType(sendBuffer + offset, (byte) 'Z'); - ReadyForQueryMsg.setLen(sendBuffer + offset, 5); - ReadyForQueryMsg.setStatus(sendBuffer + offset, (byte) 'I'); - nf.send(fd, sendBuffer, (int) (offset + 6)); + responseAsciiSink.reset(); + prepareLoginOk(); + prepareParams("TimeZone", "GMT"); + prepareParams("application_name", "QuestDB"); + prepareParams("server_version_num", "100000"); + prepareParams("integer_datetimes", "on"); + prepareReadyForQuery(); + send(fd); + } + + private void send(long fd) { + // todo: deal with incomplete send + int n = nf.send(fd, sendBuffer, (int) (sendBufferPtr - sendBuffer)); + LOG.info().$("sent [n=").$(n).$(']').$(); + } + + private void prepareLoginOk() { + responseAsciiSink.put('R'); + responseAsciiSink.putNetworkInt(8); // length of this message + responseAsciiSink.putNetworkInt(0); // response code + } + + private void prepareReadyForQuery() { + responseAsciiSink.put('Z'); + responseAsciiSink.putNetworkInt(5); + responseAsciiSink.put('I'); + } + + private void prepareParams(String timeZone, String gmt) { + responseAsciiSink.put('S'); + final long addr = responseAsciiSink.skip(); + responseAsciiSink.encodeUtf8Z(timeZone); + responseAsciiSink.encodeUtf8Z(gmt); + responseAsciiSink.putLen(addr); } private long getStringLength(long x, long limit) { @@ -313,8 +435,53 @@ public class WireParser implements Closeable { return -1; } - @FunctionalInterface - public interface MessageHandler { - int onMessage(int action); + private class ResponseAsciiSink extends AbstractCharSink { + @Override + public CharSink put(CharSequence cs, int start, int end) { + for (int i = start; i < end; i++) { + Unsafe.getUnsafe().putByte(sendBufferPtr + i, (byte) cs.charAt(i)); + } + sendBufferPtr += (end - start); + return this; + } + + @Override + public CharSink put(char c) { + Unsafe.getUnsafe().putByte(sendBufferPtr++, (byte) c); + return this; + } + + public void putNetworkInt(int len) { + NetworkByteOrderUtils.putInt(sendBufferPtr, len); + sendBufferPtr += Integer.BYTES; + } + + public void putNetworkShort(short value) { + NetworkByteOrderUtils.putShort(sendBufferPtr, value); + sendBufferPtr += Short.BYTES; + } + + public void putLen(long start) { + NetworkByteOrderUtils.putInt(start, (int) (sendBufferPtr - start)); + } + + public void putLenEx(long start) { + NetworkByteOrderUtils.putInt(start, (int) (sendBufferPtr - start - Integer.BYTES)); + } + + long skip() { + long checkpoint = sendBufferPtr; + sendBufferPtr += Integer.BYTES; + return checkpoint; + } + + void reset() { + sendBufferPtr = sendBuffer; + } + + void encodeUtf8Z(CharSequence value) { + encodeUtf8(value); + Unsafe.getUnsafe().putByte(sendBufferPtr++, (byte) 0); + } } } diff --git a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/NetworkByteOrderUtils.java b/core/src/main/java/com/questdb/cutlass/pgwire/codecs/NetworkByteOrderUtils.java index 93cae7efbd93ada0e427f5a78cf60ae891319bae..9c8f4938d380636966ca76d47e40b6f4a304b187 100644 --- a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/NetworkByteOrderUtils.java +++ b/core/src/main/java/com/questdb/cutlass/pgwire/codecs/NetworkByteOrderUtils.java @@ -27,10 +27,10 @@ import com.questdb.std.Unsafe; public class NetworkByteOrderUtils { public static int getInt(long address) { - int b = Unsafe.getUnsafe().getByte(address); - b = (b << 8) | Unsafe.getUnsafe().getByte(address + 1); - b = (b << 8) | Unsafe.getUnsafe().getByte(address + 2); - return (b << 8) | Unsafe.getUnsafe().getByte(address + 3); + int b = Unsafe.getUnsafe().getByte(address) & 0xff; + b = (b << 8) | Unsafe.getUnsafe().getByte(address + 1) & 0xff; + b = (b << 8) | Unsafe.getUnsafe().getByte(address + 2) & 0xff; + return (b << 8) | Unsafe.getUnsafe().getByte(address + 3) & 0xff; } public static void putInt(long address, int value) { @@ -39,4 +39,9 @@ public class NetworkByteOrderUtils { Unsafe.getUnsafe().putByte(address + 2, (byte) (value >>> 8)); Unsafe.getUnsafe().putByte(address + 3, (byte) (value)); } + + public static void putShort(long address, short value) { + Unsafe.getUnsafe().putByte(address, (byte) (value >>> 8)); + Unsafe.getUnsafe().putByte(address + 1, (byte) (value)); + } } diff --git a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/AuthenticationMD5PasswordMsg.java b/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/AuthenticationMD5PasswordMsg.java deleted file mode 100644 index 9f082dfa899b6a24c78a77ab6ec5137ef8331fdb..0000000000000000000000000000000000000000 --- a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/AuthenticationMD5PasswordMsg.java +++ /dev/null @@ -1,28 +0,0 @@ -/******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - * - * Copyright (C) 2014-2019 Appsicle - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - ******************************************************************************/ - -package com.questdb.cutlass.pgwire.codecs.out; - -public class AuthenticationMD5PasswordMsg extends AuthenticationMsg { - -} diff --git a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/AuthenticationMsg.java b/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/AuthenticationMsg.java deleted file mode 100644 index 47b25c67f1a91b54ba3df130f90e9cfffc3a1dcf..0000000000000000000000000000000000000000 --- a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/AuthenticationMsg.java +++ /dev/null @@ -1,33 +0,0 @@ -/******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - * - * Copyright (C) 2014-2019 Appsicle - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - ******************************************************************************/ - -package com.questdb.cutlass.pgwire.codecs.out; - -import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader; -import com.questdb.cutlass.pgwire.codecs.NetworkByteOrderUtils; - -public class AuthenticationMsg extends AbstractTypePrefixedHeader { - public static void setResponseCode(long address, int status) { - NetworkByteOrderUtils.putInt(address + AbstractTypePrefixedHeader.LEN, status); - } -} diff --git a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/ParameterStatusMsg.java b/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/ParameterStatusMsg.java deleted file mode 100644 index 01d0704d4f8443ae082d35ff2cc2fceb4202e83d..0000000000000000000000000000000000000000 --- a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/ParameterStatusMsg.java +++ /dev/null @@ -1,49 +0,0 @@ -/******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - * - * Copyright (C) 2014-2019 Appsicle - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - ******************************************************************************/ - -package com.questdb.cutlass.pgwire.codecs.out; - -import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader; -import com.questdb.std.Unsafe; - -public class ParameterStatusMsg extends AbstractTypePrefixedHeader { - public static int setParameterPair(long address, CharSequence name, CharSequence value) { - setType(address, (byte) 'S'); - long p = address + AbstractTypePrefixedHeader.LEN; - final long start = p; - p = copyStringZ(p, name); - p = copyStringZ(p, value); - int len = (int) (p - start); - setLen(address, len + AbstractTypePrefixedHeader.LEN - 1); - return len + AbstractTypePrefixedHeader.LEN; - } - - private static long copyStringZ(long p, CharSequence value) { - for (int i = 0, m = value.length(); i < m; i++) { - Unsafe.getUnsafe().putByte(p++, (byte) value.charAt(i)); - } - Unsafe.getUnsafe().putByte(p, (byte) 0); - return p + 1; - } - -} diff --git a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/ReadyForQueryMsg.java b/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/ReadyForQueryMsg.java deleted file mode 100644 index d98367bac9ba783382686eed34db960d0dff269a..0000000000000000000000000000000000000000 --- a/core/src/main/java/com/questdb/cutlass/pgwire/codecs/out/ReadyForQueryMsg.java +++ /dev/null @@ -1,33 +0,0 @@ -/******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - * - * Copyright (C) 2014-2019 Appsicle - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - ******************************************************************************/ - -package com.questdb.cutlass.pgwire.codecs.out; - -import com.questdb.cutlass.pgwire.codecs.AbstractTypePrefixedHeader; -import com.questdb.std.Unsafe; - -public class ReadyForQueryMsg extends AbstractTypePrefixedHeader { - public static void setStatus(long address, byte status) { - Unsafe.getUnsafe().putByte(address + AbstractTypePrefixedHeader.LEN, status); - } -} diff --git a/core/src/test/java/com/questdb/cutlass/pgwire/WireParserTest.java b/core/src/test/java/com/questdb/cutlass/pgwire/WireParserTest.java index 83af33cd3abaa9068f663c46f6e70369c01e5f66..e5e6c0ed01b65836914581dc4d93012271384b83 100644 --- a/core/src/test/java/com/questdb/cutlass/pgwire/WireParserTest.java +++ b/core/src/test/java/com/questdb/cutlass/pgwire/WireParserTest.java @@ -23,16 +23,15 @@ package com.questdb.cutlass.pgwire; +import com.questdb.griffin.AbstractGriffinTest; import com.questdb.network.*; import com.questdb.std.Os; import org.junit.Test; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; +import java.sql.*; import java.util.Properties; -public class WireParserTest { +public class WireParserTest extends AbstractGriffinTest { @Test public void testSimple() throws SQLException { @@ -55,8 +54,7 @@ public class WireParserTest { public int getSendBufferSize() { return 1024 * 1024; } - }); - + }, engine); Net.setReusePort(fd); @@ -81,10 +79,16 @@ public class WireParserTest { properties.setProperty("sslmode", "disable"); final Connection connection = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:9120/nabu_app", properties); -// Statement statement = connection.createStatement(); -// statement.executeQuery("select * from tab"); + Statement statement = connection.createStatement(); +// ResultSet rs = statement.executeQuery("select rnd_str(4,4,0) s, timestamp_sequence(to_timestamp(0),10000) t, rnd_double(0) d, rnd_int() i from long_sequence(4000)2"); + ResultSet rs = statement.executeQuery("select rnd_str(4,4,0) s, rnd_int() i from long_sequence(5)"); + while (rs.next()) { + System.out.print(rs.getString(1)); + System.out.print(","); + System.out.print(rs.getInt(2)); + System.out.println(); + } connection.close(); - } else { throw NetworkError.instance(Os.errno()).couldNotBindSocket(); }