diff --git a/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java b/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java index ac2f0cb546eb054f3c5fb6b8a1d5f74686956855..7110e34fecb4c6737a78d02bf1ac2115d78972fc 100644 --- a/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java +++ b/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java @@ -1233,7 +1233,6 @@ public class SqlCodeGenerator implements Mutable { final RecordCursorFactory factory = generateSubQuery(model, executionContext); // we require timestamp - // todo: this looks like generic code final int timestampIndex = getTimestampIndex(model, factory); if (timestampIndex == -1) { Misc.free(factory); diff --git a/core/src/main/java/io/questdb/griffin/SqlParser.java b/core/src/main/java/io/questdb/griffin/SqlParser.java index dedacc9fbc4ed846f68673212c3a5038c00cf067..125e28353390508e9e91d946490b4699635fc01a 100644 --- a/core/src/main/java/io/questdb/griffin/SqlParser.java +++ b/core/src/main/java/io/questdb/griffin/SqlParser.java @@ -65,6 +65,7 @@ public final class SqlParser { private final PostOrderTreeTraversalAlgo.Visitor rewriteConcat0Ref = this::rewriteConcat0; private final PostOrderTreeTraversalAlgo.Visitor rewriteTypeQualifier0Ref = this::rewriteTypeQualifier0; private boolean subQueryMode = false; + SqlParser( CairoConfiguration configuration, SqlOptimiser optimiser, @@ -147,18 +148,18 @@ public final class SqlParser { } private void expectBy(GenericLexer lexer) throws SqlException { - CharSequence tok = optTok(lexer); - if (tok == null || !isByKeyword(tok)) { - throw SqlException.$((lexer.getPosition()), "'by' expected"); + if (isByKeyword(tok(lexer, "by"))) { + return; } + throw SqlException.$((lexer.getPosition()), "'by' expected"); } private ExpressionNode expectExpr(GenericLexer lexer) throws SqlException { - ExpressionNode n = expr(lexer, (QueryModel) null); - if (n == null) { - throw SqlException.$(lexer.getUnparsed() == null ? lexer.getPosition() : lexer.lastTokenPosition(), "Expression expected"); + final ExpressionNode n = expr(lexer, (QueryModel) null); + if (n != null) { + return n; } - return n; + throw SqlException.$(lexer.getUnparsed() == null ? lexer.getPosition() : lexer.lastTokenPosition(), "Expression expected"); } private int expectInt(GenericLexer lexer) throws SqlException { @@ -1421,11 +1422,6 @@ public final class SqlParser { return parent; } - private ExpressionNode rewriteTypeQualifier(ExpressionNode parent) throws SqlException { - traversalAlgo.traverse(parent, rewriteTypeQualifier0Ref); - return parent; - } - private void rewriteConcat0(ExpressionNode node) { if (node.type == ExpressionNode.OPERATION && isConcatOperator(node.token)) { node.type = ExpressionNode.FUNCTION; @@ -1465,6 +1461,15 @@ public final class SqlParser { } } + private ExpressionNode rewriteKnownStatements(ExpressionNode parent) throws SqlException { + return rewriteConcat(rewriteCase(rewriteCount(rewriteTypeQualifier(parent)))); + } + + private ExpressionNode rewriteTypeQualifier(ExpressionNode parent) throws SqlException { + traversalAlgo.traverse(parent, rewriteTypeQualifier0Ref); + return parent; + } + /** * Rewrites 'abc'::blah - type qualifier * @@ -1481,10 +1486,6 @@ public final class SqlParser { } } - private ExpressionNode rewriteKnownStatements(ExpressionNode parent) throws SqlException { - return rewriteConcat(rewriteCase(rewriteCount(rewriteTypeQualifier(parent)))); - } - private int toColumnType(GenericLexer lexer, CharSequence tok) throws SqlException { final int type = ColumnType.columnTypeOf(tok); if (type == -1) { diff --git a/core/src/main/java/io/questdb/griffin/engine/table/LatestByAllRecordCursor.java b/core/src/main/java/io/questdb/griffin/engine/table/LatestByAllRecordCursor.java index 6e23e2c9377f2a025b7cc586381b44cd559ae99a..9cc8a6d66739f63c9c394ac069f9c873aa5004db 100644 --- a/core/src/main/java/io/questdb/griffin/engine/table/LatestByAllRecordCursor.java +++ b/core/src/main/java/io/questdb/griffin/engine/table/LatestByAllRecordCursor.java @@ -47,24 +47,25 @@ class LatestByAllRecordCursor extends AbstractRecordListCursor { @Override protected void buildTreeMap(SqlExecutionContext executionContext) { - map.clear(); DataFrame frame; - while ((frame = this.dataFrameCursor.next()) != null) { - final int partitionIndex = frame.getPartitionIndex(); - final long rowLo = frame.getRowLo(); - final long rowHi = frame.getRowHi() - 1; + try { + while ((frame = this.dataFrameCursor.next()) != null) { + final int partitionIndex = frame.getPartitionIndex(); + final long rowLo = frame.getRowLo(); + final long rowHi = frame.getRowHi() - 1; - recordA.jumpTo(frame.getPartitionIndex(), rowHi); - for (long row = rowHi; row >= rowLo; row--) { - recordA.setRecordIndex(row); - MapKey key = map.withKey(); - key.put(recordA, recordSink); - if (key.create()) { - rows.add(Rows.toRowID(partitionIndex, row)); + recordA.jumpTo(frame.getPartitionIndex(), rowHi); + for (long row = rowHi; row >= rowLo; row--) { + recordA.setRecordIndex(row); + MapKey key = map.withKey(); + key.put(recordA, recordSink); + if (key.create()) { + rows.add(Rows.toRowID(partitionIndex, row)); + } } } + } finally { + map.clear(); } - - map.clear(); } } diff --git a/core/src/main/java/io/questdb/griffin/model/QueryModel.java b/core/src/main/java/io/questdb/griffin/model/QueryModel.java index 8ba3e288fe3d1c21b0ebb6649efab214f714de4e..bf754dc80bdb73849a90e39f9f1d40207169ac56 100644 --- a/core/src/main/java/io/questdb/griffin/model/QueryModel.java +++ b/core/src/main/java/io/questdb/griffin/model/QueryModel.java @@ -808,6 +808,9 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin if (getLatestBy().size() > 0) { sink.put(" latest by "); for (int i = 0, n = getLatestBy().size(); i < n; i++) { + if (i > 0) { + sink.put(','); + } getLatestBy().getQuick(i).toSink(sink); } } diff --git a/core/src/main/java/io/questdb/std/DirectLongList.java b/core/src/main/java/io/questdb/std/DirectLongList.java index ee52e77386cb2ccd36f08d9271a235cee5461a1d..75ab7411018557bb9a7d7ef5a43e885f1eaf4188 100644 --- a/core/src/main/java/io/questdb/std/DirectLongList.java +++ b/core/src/main/java/io/questdb/std/DirectLongList.java @@ -44,8 +44,8 @@ public class DirectLongList implements Mutable, Closeable { public DirectLongList(long capacity) { this.pow2 = 3; - this.address = Unsafe.malloc(this.capacity = ((capacity << 3) + Misc.CACHE_LINE_SIZE)); - this.start = this.pos = address + (address & (Misc.CACHE_LINE_SIZE - 1)); + this.address = Unsafe.malloc(this.capacity = ((capacity << 3))); + this.start = this.pos = address; this.limit = pos + ((capacity - 1) << 3); this.onePow2 = (1 << 3); } @@ -65,9 +65,9 @@ public class DirectLongList implements Mutable, Closeable { this.pos += count; } - public int binarySearch(long v) { - int low = 0; - int high = (int) ((pos - start) >> 3) - 1; + public long binarySearch(long v) { + long low = 0; + long high = ((pos - start) >> 3) - 1; while (low <= high) { @@ -75,7 +75,7 @@ public class DirectLongList implements Mutable, Closeable { return scanSearch(v, low, high); } - int mid = (low + high) >>> 1; + long mid = (low + high) >>> 1; long midVal = Unsafe.getUnsafe().getLong(start + (mid << 3)); if (midVal < v) @@ -109,8 +109,8 @@ public class DirectLongList implements Mutable, Closeable { return Unsafe.getUnsafe().getLong(start + (p << 3)); } - public int scanSearch(long v, int low, int high) { - for (int i = low; i < high; i++) { + public long scanSearch(long v, long low, long high) { + for (long i = low; i < high; i++) { long f = get(i); if (f == v) { return i; @@ -141,7 +141,7 @@ public class DirectLongList implements Mutable, Closeable { return (int) ((pos - start) >> pow2); } - public DirectLongList subset(int lo, int hi) { + public DirectLongList subset(long lo, long hi) { DirectLongList that = new DirectLongList(hi - lo); Unsafe.getUnsafe().copyMemory(start + (lo << 3), that.start, (hi - lo) << 3); that.pos += (hi - lo) << 3; @@ -175,11 +175,10 @@ public class DirectLongList implements Mutable, Closeable { private void extend(long capacity) { final long oldCapacity = this.capacity; long address = Unsafe.realloc(this.address, oldCapacity, this.capacity = ((capacity << pow2) + Misc.CACHE_LINE_SIZE)); - long start = address + (address & (Misc.CACHE_LINE_SIZE - 1)); - this.pos = this.pos - this.start + start; - this.limit = start + ((capacity - 1) << pow2); + this.pos = this.pos - this.start + address; + this.limit = address + ((capacity - 1) << pow2); this.address = address; - this.start = start; - LOG.info().$("resized [old=").$(oldCapacity).$(", new=").$(this.capacity).$(']').$(); + this.start = address; + LOG.debug().$("resized [old=").$(oldCapacity).$(", new=").$(this.capacity).$(']').$(); } } diff --git a/core/src/test/java/io/questdb/cutlass/http/RetryIODispatcherTest.java b/core/src/test/java/io/questdb/cutlass/http/RetryIODispatcherTest.java index ad451f73c708b49e0af3170ec544546295d41f79..d3836689a1151700a767b7283576a65851ac0810 100644 --- a/core/src/test/java/io/questdb/cutlass/http/RetryIODispatcherTest.java +++ b/core/src/test/java/io/questdb/cutlass/http/RetryIODispatcherTest.java @@ -25,15 +25,20 @@ package io.questdb.cutlass.http; import io.questdb.cairo.CairoEngine; -import io.questdb.cairo.TableWriter; import io.questdb.cairo.EntryUnavailableException; +import io.questdb.cairo.TableWriter; import io.questdb.cairo.security.AllowAllCairoSecurityContext; import io.questdb.cutlass.http.processors.TextImportProcessor; import io.questdb.log.Log; import io.questdb.log.LogFactory; -import io.questdb.network.*; +import io.questdb.network.NetworkFacade; +import io.questdb.network.NetworkFacadeImpl; +import io.questdb.network.ServerDisconnectException; import org.jetbrains.annotations.NotNull; -import org.junit.*; +import org.junit.Assert; +import org.junit.ComparisonFailure; +import org.junit.Rule; +import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.util.concurrent.CountDownLatch; @@ -126,24 +131,24 @@ public class RetryIODispatcherTest { public TemporaryFolder temp = new TemporaryFolder(); @Test - public void testInsertWaitsWhenWriterLockedLoop() throws Exception { + public void testImportProcessedWhenClientDisconnectedLoop() throws Exception { for (int i = 0; i < 10; i++) { System.out.println("*************************************************************************************"); System.out.println("************************** Run " + i + " ********************************"); System.out.println("*************************************************************************************"); - testInsertWaitsWhenWriterLocked(); + assertImportProcessedWhenClientDisconnected(); temp.delete(); temp.create(); } } @Test - public void testInsertsIsPerformedWhenWriterLockedAndDisconnectedLoop() throws Exception { - for (int i = 0; i < 10; i++) { + public void testInsertWaitsExceedsRerunProcessingQueueSizeLoop() throws Exception { + for (int i = 0; i < 5; i++) { System.out.println("*************************************************************************************"); System.out.println("************************** Run " + i + " ********************************"); System.out.println("*************************************************************************************"); - testInsertsIsPerformedWhenWriterLockedAndDisconnected(); + assertInsertWaitsExceedsRerunProcessingQueueSize(); temp.delete(); temp.create(); } @@ -170,24 +175,24 @@ public class RetryIODispatcherTest { } @Test - public void testImportProcessedWhenClientDisconnectedLoop() throws Exception { + public void testInsertWaitsWhenWriterLockedLoop() throws Exception { for (int i = 0; i < 10; i++) { System.out.println("*************************************************************************************"); System.out.println("************************** Run " + i + " ********************************"); System.out.println("*************************************************************************************"); - testImportProcessedWhenClientDisconnected(); + assertInsertWaitsWhenWriterLocked(); temp.delete(); temp.create(); } } @Test - public void testInsertWaitsExceedsRerunProcessingQueueSizeLoop() throws Exception { - for (int i = 0; i < 5; i++) { + public void testInsertsIsPerformedWhenWriterLockedAndDisconnectedLoop() throws Exception { + for (int i = 0; i < 10; i++) { System.out.println("*************************************************************************************"); System.out.println("************************** Run " + i + " ********************************"); System.out.println("*************************************************************************************"); - testInsertWaitsExceedsRerunProcessingQueueSize(); + assertInsertsIsPerformedWhenWriterLockedAndDisconnected(); temp.delete(); temp.create(); } @@ -285,40 +290,30 @@ public class RetryIODispatcherTest { } } - public void testInsertWaitsWhenWriterLocked() throws Exception { + private void assertImportProcessedWhenClientDisconnected() throws Exception { final int parallelCount = 2; new HttpQueryTestBuilder() .withTempFolder(temp) - .withWorkerCount(parallelCount) + .withWorkerCount(2) .withHttpServerConfigBuilder(new HttpServerConfigurationBuilder()) .withTelemetry(false) - .run(engine -> { - // create table - new SendAndReceiveRequestBuilder().executeWithStandardHeaders( - "GET /query?query=%0A%0A%0Acreate+table+balances_x+(%0A%09cust_id+int%2C+%0A%09balance_ccy+symbol%2C+%0A%09balance+double%2C+%0A%09status+byte%2C+%0A%09timestamp+timestamp%0A)&limit=0%2C1000&count=true HTTP/1.1\r\n", - "0c\r\n" + - "{\"ddl\":\"OK\"}\r\n" + - "00\r\n" + - "\r\n" - ); + .run((engine) -> { + // create table and do 1 import + new SendAndReceiveRequestBuilder().execute(ValidImportRequest, ValidImportResponse); - TableWriter writer = lockWriter(engine, "balances_x"); + TableWriter writer = lockWriter(engine, "fhv_tripdata_2017-02.csv"); - final int insertCount = 10; + final int validRequestRecordCount = 24; + final int insertCount = 1; CountDownLatch countDownLatch = new CountDownLatch(parallelCount); for (int i = 0; i < parallelCount; i++) { + int finalI = i; new Thread(() -> { try { for (int r = 0; r < insertCount; r++) { // insert one record try { - new SendAndReceiveRequestBuilder().executeWithStandardHeaders( - "GET /query?query=%0A%0Ainsert+into+balances_x+(cust_id%2C+balance_ccy%2C+balance%2C+timestamp)+values+(1%2C+%27USD%27%2C+1500.00%2C+6000000001)&limit=0%2C1000&count=true HTTP/1.1\r\n", - "0c\r\n" + - "{\"ddl\":\"OK\"}\r\n" + - "00\r\n" + - "\r\n" - ); + new SendAndReceiveRequestBuilder().execute(ValidImportRequest, ""); } catch (Exception e) { LOG.error().$("Failed execute insert http request. Server error ").$(e).$(); } @@ -326,30 +321,39 @@ public class RetryIODispatcherTest { } finally { countDownLatch.countDown(); } + LOG.info().$("Stopped thread ").$(finalI).$(); }).start(); } - boolean finished = countDownLatch.await(200, TimeUnit.MILLISECONDS); - - // Cairo engine should not allow second writer to be opened on the same table - // Cairo is expected to have finished == false - Assert.assertFalse(finished); + countDownLatch.await(); + // Cairo engine should not allow second writer to be opened on the same table, all requests should wait for the writer to be available writer.close(); - countDownLatch.await(); - // check if we have parallelCount x insertCount records - new SendAndReceiveRequestBuilder().executeWithStandardHeaders( - "GET /query?query=select+count(*)+from+balances_x&count=true HTTP/1.1\r\n", - "71\r\n" + - "{\"query\":\"select count(*) from balances_x\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + parallelCount * insertCount + "]],\"count\":1}\r\n" + - "00\r\n" + - "\r\n" - ); + for (int i = 0; i < 20; i++) { + + try { + // check if we have parallelCount x insertCount records + new SendAndReceiveRequestBuilder().executeWithStandardHeaders( + "GET /query?query=select+count(*)+from+%22fhv_tripdata_2017-02.csv%22&count=true HTTP/1.1\r\n", + "83\r\n" + + "{\"query\":\"select count(*) from \\\"fhv_tripdata_2017-02.csv\\\"\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + (parallelCount + 1) * validRequestRecordCount + "]],\"count\":1}\r\n" + + "00\r\n" + + "\r\n"); + return; + } catch (ComparisonFailure e) { + if (i < 9) { + Thread.sleep(50); + } else { + throw e; + } + } + } + }); } - public void testInsertWaitsExceedsRerunProcessingQueueSize() throws Exception { + private void assertInsertWaitsExceedsRerunProcessingQueueSize() throws Exception { final int rerunProcessingQueueSize = 1; final int parallelCount = 4; new HttpQueryTestBuilder() @@ -417,8 +421,8 @@ public class RetryIODispatcherTest { } - public void testInsertsIsPerformedWhenWriterLockedAndDisconnected() throws Exception { - final int parallelCount = 4; + private void assertInsertWaitsWhenWriterLocked() throws Exception { + final int parallelCount = 2; new HttpQueryTestBuilder() .withTempFolder(temp) .withWorkerCount(parallelCount) @@ -435,58 +439,49 @@ public class RetryIODispatcherTest { ); TableWriter writer = lockWriter(engine, "balances_x"); + + final int insertCount = 10; CountDownLatch countDownLatch = new CountDownLatch(parallelCount); - Thread[] threads = new Thread[parallelCount]; for (int i = 0; i < parallelCount; i++) { - int finalI = i; - threads[i] = new Thread(() -> { + new Thread(() -> { try { - // insert one record - // await nothing - try { - Thread.sleep(finalI * 5); - new SendAndReceiveRequestBuilder() - .withPauseBetweenSendAndReceive(200) - .execute( - "GET /query?query=%0A%0Ainsert+into+balances_x+(cust_id%2C+balance_ccy%2C+balance%2C+timestamp)+values+(" + finalI + "%2C+%27USD%27%2C+1500.00%2C+6000000001)&limit=0%2C1000&count=true HTTP/1.1\r\n" - + SendAndReceiveRequestBuilder.RequestHeaders, - "" - ); - } catch (Exception e) { - LOG.error().$("Failed execute insert http request. Server error ").$(e); + for (int r = 0; r < insertCount; r++) { + // insert one record + try { + new SendAndReceiveRequestBuilder().executeWithStandardHeaders( + "GET /query?query=%0A%0Ainsert+into+balances_x+(cust_id%2C+balance_ccy%2C+balance%2C+timestamp)+values+(1%2C+%27USD%27%2C+1500.00%2C+6000000001)&limit=0%2C1000&count=true HTTP/1.1\r\n", + "0c\r\n" + + "{\"ddl\":\"OK\"}\r\n" + + "00\r\n" + + "\r\n" + ); + } catch (Exception e) { + LOG.error().$("Failed execute insert http request. Server error ").$(e).$(); + } } } finally { countDownLatch.countDown(); } - }); - threads[i].start(); + }).start(); } - countDownLatch.await(); - writer.close(); + boolean finished = countDownLatch.await(200, TimeUnit.MILLISECONDS); - // check if we have parallelCount x insertCount records - int waitCount = 1000 / 50 * parallelCount; - for (int i = 0; i < waitCount; i++) { + // Cairo engine should not allow second writer to be opened on the same table + // Cairo is expected to have finished == false + Assert.assertFalse(finished); - try { - new SendAndReceiveRequestBuilder().executeWithStandardHeaders( - "GET /query?query=select+count()+from+balances_x&count=true HTTP/1.1\r\n", - "6f\r\n" + - "{\"query\":\"select count() from balances_x\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + parallelCount + "]],\"count\":1}\r\n" + - "00\r\n" + - "\r\n" - ); - return; - } catch (ComparisonFailure e) { - if (i < waitCount - 1) { - Thread.sleep(50); - } else { - throw e; - } + writer.close(); + countDownLatch.await(); - } - } + // check if we have parallelCount x insertCount records + new SendAndReceiveRequestBuilder().executeWithStandardHeaders( + "GET /query?query=select+count(*)+from+balances_x&count=true HTTP/1.1\r\n", + "71\r\n" + + "{\"query\":\"select count(*) from balances_x\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + parallelCount * insertCount + "]],\"count\":1}\r\n" + + "00\r\n" + + "\r\n" + ); }); } @@ -569,66 +564,76 @@ public class RetryIODispatcherTest { }); } - public void testImportProcessedWhenClientDisconnected() throws Exception { - final int parallelCount = 2; + private void assertInsertsIsPerformedWhenWriterLockedAndDisconnected() throws Exception { + final int parallelCount = 4; new HttpQueryTestBuilder() .withTempFolder(temp) - .withWorkerCount(2) + .withWorkerCount(parallelCount) .withHttpServerConfigBuilder(new HttpServerConfigurationBuilder()) .withTelemetry(false) - .run((engine) -> { - // create table and do 1 import - new SendAndReceiveRequestBuilder().execute(ValidImportRequest, ValidImportResponse); - - TableWriter writer = lockWriter(engine, "fhv_tripdata_2017-02.csv"); + .run(engine -> { + // create table + new SendAndReceiveRequestBuilder().executeWithStandardHeaders( + "GET /query?query=%0A%0A%0Acreate+table+balances_x+(%0A%09cust_id+int%2C+%0A%09balance_ccy+symbol%2C+%0A%09balance+double%2C+%0A%09status+byte%2C+%0A%09timestamp+timestamp%0A)&limit=0%2C1000&count=true HTTP/1.1\r\n", + "0c\r\n" + + "{\"ddl\":\"OK\"}\r\n" + + "00\r\n" + + "\r\n" + ); - final int validRequestRecordCount = 24; - final int insertCount = 1; + TableWriter writer = lockWriter(engine, "balances_x"); CountDownLatch countDownLatch = new CountDownLatch(parallelCount); + Thread[] threads = new Thread[parallelCount]; for (int i = 0; i < parallelCount; i++) { int finalI = i; - new Thread(() -> { + threads[i] = new Thread(() -> { try { - for (int r = 0; r < insertCount; r++) { - // insert one record - try { - new SendAndReceiveRequestBuilder().execute(ValidImportRequest, ""); - } catch (Exception e) { - LOG.error().$("Failed execute insert http request. Server error ").$(e).$(); - } + // insert one record + // await nothing + try { + Thread.sleep(finalI * 5); + new SendAndReceiveRequestBuilder() + .withPauseBetweenSendAndReceive(200) + .execute( + "GET /query?query=%0A%0Ainsert+into+balances_x+(cust_id%2C+balance_ccy%2C+balance%2C+timestamp)+values+(" + finalI + "%2C+%27USD%27%2C+1500.00%2C+6000000001)&limit=0%2C1000&count=true HTTP/1.1\r\n" + + SendAndReceiveRequestBuilder.RequestHeaders, + "" + ); + } catch (Exception e) { + LOG.error().$("Failed execute insert http request. Server error ").$(e); } } finally { countDownLatch.countDown(); } - LOG.info().$("Stopped thread ").$(finalI).$(); - }).start(); + }); + threads[i].start(); } countDownLatch.await(); - - // Cairo engine should not allow second writer to be opened on the same table, all requests should wait for the writer to be available writer.close(); - for (int i = 0; i < 20; i++) { + // check if we have parallelCount x insertCount records + int waitCount = 1000 / 50 * parallelCount; + for (int i = 0; i < waitCount; i++) { try { - // check if we have parallelCount x insertCount records new SendAndReceiveRequestBuilder().executeWithStandardHeaders( - "GET /query?query=select+count(*)+from+%22fhv_tripdata_2017-02.csv%22&count=true HTTP/1.1\r\n", - "83\r\n" + - "{\"query\":\"select count(*) from \\\"fhv_tripdata_2017-02.csv\\\"\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + (parallelCount + 1) * validRequestRecordCount + "]],\"count\":1}\r\n" + + "GET /query?query=select+count()+from+balances_x&count=true HTTP/1.1\r\n", + "6f\r\n" + + "{\"query\":\"select count() from balances_x\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + parallelCount + "]],\"count\":1}\r\n" + "00\r\n" + - "\r\n"); + "\r\n" + ); return; } catch (ComparisonFailure e) { - if (i < 9) { + if (i < waitCount - 1) { Thread.sleep(50); } else { throw e; } + } } - }); } diff --git a/core/src/test/java/io/questdb/griffin/AbstractGriffinTest.java b/core/src/test/java/io/questdb/griffin/AbstractGriffinTest.java index 3f50b6f6133a5cb776a9bb1f0927d065bd29d815..195274bd393d85f5c570bfefc715042e261a366d 100644 --- a/core/src/test/java/io/questdb/griffin/AbstractGriffinTest.java +++ b/core/src/test/java/io/questdb/griffin/AbstractGriffinTest.java @@ -361,10 +361,14 @@ public class AbstractGriffinTest extends AbstractCairoTest { long timestamp = Long.MIN_VALUE; try (RecordCursor cursor = factory.getCursor(sqlExecutionContext)) { final Record record = cursor.getRecord(); + long c = 0; while (cursor.hasNext()) { long ts = record.getTimestamp(index); - Assert.assertTrue(timestamp <= ts); + if (timestamp > ts) { + Assert.fail("record #" + c); + } timestamp = ts; + c++; } } } diff --git a/core/src/test/java/io/questdb/griffin/SqlCodeGeneratorTest.java b/core/src/test/java/io/questdb/griffin/SqlCodeGeneratorTest.java index b048e4d99787ba87fb502fd60ecdf2bad1ff6f1f..bef8c2c3eae42807be9497f8ca38ea9cd4ff008d 100644 --- a/core/src/test/java/io/questdb/griffin/SqlCodeGeneratorTest.java +++ b/core/src/test/java/io/questdb/griffin/SqlCodeGeneratorTest.java @@ -2504,6 +2504,39 @@ public class SqlCodeGeneratorTest extends AbstractGriffinTest { ); } + @Test + public void testLatestByTimestampInclusion() throws Exception { + assertQuery("ts\tmarket_type\tavg\n" + + "1970-01-01T00:00:00.000000Z\taaa\t0.49992728629932576\n" + + "1970-01-01T00:00:00.000000Z\tbbb\t0.500285563758478\n" + + "1970-01-01T00:00:01.000000Z\taaa\t0.500040169925671\n" + + "1970-01-01T00:00:01.000000Z\tbbb\t0.5008686113849173\n" + + "1970-01-01T00:00:02.000000Z\taaa\t0.49977074601999855\n" + + "1970-01-01T00:00:02.000000Z\tbbb\t0.4999258418217269\n" + + "1970-01-01T00:00:03.000000Z\taaa\t0.5003595019568708\n" + + "1970-01-01T00:00:03.000000Z\tbbb\t0.5002857992170555\n" + + "1970-01-01T00:00:04.000000Z\tbbb\t0.4997116251279621\n" + + "1970-01-01T00:00:04.000000Z\taaa\t0.5006208473770267\n" + + "1970-01-01T00:00:05.000000Z\tbbb\t0.49988619432529985\n" + + "1970-01-01T00:00:05.000000Z\taaa\t0.5002852550150528\n" + + "1970-01-01T00:00:06.000000Z\taaa\t0.4998229395659802\n" + + "1970-01-01T00:00:06.000000Z\tbbb\t0.4997012831335711\n" + + "1970-01-01T00:00:07.000000Z\tbbb\t0.49945806525231845\n" + + "1970-01-01T00:00:07.000000Z\taaa\t0.4995901449794158\n" + + "1970-01-01T00:00:08.000000Z\taaa\t0.5002616949495469\n" + + "1970-01-01T00:00:08.000000Z\tbbb\t0.5005399447758458\n" + + "1970-01-01T00:00:09.000000Z\taaa\t0.5003054203632804\n" + + "1970-01-01T00:00:09.000000Z\tbbb\t0.500094369884023\n", + "select ts, market_type, avg(bid_price) FROM market_updates LATEST BY ts, market_type SAMPLE BY 1s", + "create table market_updates as (select rnd_symbol('aaa','bbb') market_type, rnd_double() bid_price, timestamp_sequence(0,1) ts from long_sequence(10000000)" + + ") timestamp(ts)", + "ts", + false, + true, + false + ); + } + @Test public void testLatestByNonExistingColumn() throws Exception { assertFailure( diff --git a/core/src/test/java/io/questdb/griffin/SqlParserTest.java b/core/src/test/java/io/questdb/griffin/SqlParserTest.java index 90f66bac14bf2ceb0e863a2ace8040198df8c99e..d319d2b6d4e62577a1acd13c0a40a060b2035265 100644 --- a/core/src/test/java/io/questdb/griffin/SqlParserTest.java +++ b/core/src/test/java/io/questdb/griffin/SqlParserTest.java @@ -3181,11 +3181,11 @@ public class SqlParserTest extends AbstractGriffinTest { } @Test - public void testLatestBySyntax() throws Exception { - assertSyntaxError( - "select * from tab latest", - 24, - "'by' expected" + public void testLatestByMultipleColumns() throws SqlException { + assertQuery( + "select-group-by ts, market_type, avg(bid_price) avg from (select [ts, market_type, bid_price] from market_updates timestamp (ts) latest by ts,market_type) sample by 1s", + "select ts, market_type, avg(bid_price) FROM market_updates LATEST BY ts, market_type SAMPLE BY 1s", + modelOf("market_updates").timestamp("ts").col("market_type", ColumnType.SYMBOL).col("bid_price", ColumnType.DOUBLE) ); } @@ -5427,6 +5427,15 @@ public class SqlParserTest extends AbstractGriffinTest { ); } + @Test + public void testLatestBySyntax() throws Exception { + assertSyntaxError( + "select * from tab latest", + 24, + "by expected" + ); + } + @Test public void testUnionMoveWhereIntoSubQuery() throws Exception { assertQuery(