diff --git a/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java b/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java index 543dab25ef60cdd85ca4de8c62211ed7162d241f..e1d84f919184b2de15015762439c62b5ca47c892 100644 --- a/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java +++ b/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java @@ -1203,13 +1203,12 @@ public class SqlCodeGenerator implements Mutable, Closeable { final SymbolMapReader symbolMapReader = reader.getSymbolMapReader(columnIndexes.getQuick(latestByIndex)); if (nKeyValues > 1) { - return new LatestByValuesFilteredRecordCursorFactory( + return new LatestByDeferredListValuesFilteredRecordCursorFactory( configuration, metadata, dataFrameCursorFactory, latestByIndex, intrinsicModel.keyValueFuncs, - symbolMapReader, filter, columnIndexes ); @@ -1255,12 +1254,11 @@ public class SqlCodeGenerator implements Mutable, Closeable { prefixes ); } else { - return new LatestByAllFilteredRecordCursorFactory( - metadata, + return new LatestByDeferredListValuesFilteredRecordCursorFactory( configuration, + metadata, dataFrameCursorFactory, - RecordSinkFactory.getInstance(asm, metadata, listColumnFilterA, false), - keyTypes, + latestByIndex, filter, columnIndexes ); @@ -3022,15 +3020,31 @@ public class SqlCodeGenerator implements Mutable, Closeable { model.getLatestBy().clear(); // listColumnFilterA = latest by column indexes - if (latestByColumnCount == 1 && myMeta.isColumnIndexed(listColumnFilterA.getColumnIndexFactored(0))) { - return new LatestByAllIndexedRecordCursorFactory( - myMeta, - configuration, - new FullBwdDataFrameCursorFactory(engine, tableName, model.getTableId(), model.getTableVersion()), - listColumnFilterA.getColumnIndexFactored(0), - columnIndexes, - prefixes - ); + if (latestByColumnCount == 1) { + int latestByColumnIndex = listColumnFilterA.getColumnIndexFactored(0); + if (myMeta.isColumnIndexed(latestByColumnIndex)) { + return new LatestByAllIndexedRecordCursorFactory( + myMeta, + configuration, + new FullBwdDataFrameCursorFactory(engine, tableName, model.getTableId(), model.getTableVersion()), + listColumnFilterA.getColumnIndexFactored(0), + columnIndexes, + prefixes + ); + } + + if (ColumnType.isSymbol(myMeta.getColumnType(latestByColumnIndex)) + && myMeta.isSymbolTableStatic(latestByColumnIndex)) { + // we have "latest by" symbol column values, but no index + return new LatestByDeferredListValuesFilteredRecordCursorFactory( + configuration, + myMeta, + new FullBwdDataFrameCursorFactory(engine, tableName, model.getTableId(), model.getTableVersion()), + latestByColumnIndex, + null, + columnIndexes + ); + } } return new LatestByAllFilteredRecordCursorFactory( diff --git a/core/src/main/java/io/questdb/griffin/engine/functions/columns/SymbolColumn.java b/core/src/main/java/io/questdb/griffin/engine/functions/columns/SymbolColumn.java index 81c35766ea9f71b1c15f5d95307a08e75a3d8f3a..19d5039b8224c127993accfa9095880c0fa284ed 100644 --- a/core/src/main/java/io/questdb/griffin/engine/functions/columns/SymbolColumn.java +++ b/core/src/main/java/io/questdb/griffin/engine/functions/columns/SymbolColumn.java @@ -58,7 +58,7 @@ public class SymbolColumn extends SymbolFunction implements ScalarFunction { @Override public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) { this.symbolTable = symbolTableSource.getSymbolTable(columnIndex); - //static symbol table must be non-null + // static symbol table must be non-null assert !symbolTableStatic || symbolTable != null; } diff --git a/core/src/main/java/io/questdb/griffin/engine/table/LatestByDeferredListValuesFilteredRecordCursorFactory.java b/core/src/main/java/io/questdb/griffin/engine/table/LatestByDeferredListValuesFilteredRecordCursorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..ef3311eb56ca5f695670319e5eb9a68a2195fed7 --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/table/LatestByDeferredListValuesFilteredRecordCursorFactory.java @@ -0,0 +1,121 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.table; + +import io.questdb.cairo.CairoConfiguration; +import io.questdb.cairo.sql.*; +import io.questdb.griffin.SqlException; +import io.questdb.griffin.SqlExecutionContext; +import io.questdb.std.IntHashSet; +import io.questdb.std.IntList; +import io.questdb.std.ObjList; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Iterates table backwards and finds latest values by single symbol column. + * When the LATEST BY is applied to symbol column QuestDB knows all distinct symbol values + * and in many cases can stop before scanning all the data when it finds all the expected values + */ +public class LatestByDeferredListValuesFilteredRecordCursorFactory extends AbstractDataFrameRecordCursorFactory { + private final ObjList symbolFunctions; + private final Function filter; + private final LatestByValueListRecordCursor cursor; + private final int frameSymbolIndex; + + public LatestByDeferredListValuesFilteredRecordCursorFactory( + @NotNull CairoConfiguration configuration, + @NotNull RecordMetadata metadata, + @NotNull DataFrameCursorFactory dataFrameCursorFactory, + int columnIndex, + @Nullable ObjList symbolFunctions, + @Nullable Function filter, + @NotNull IntList columnIndexes + ) { + super(metadata, dataFrameCursorFactory); + this.symbolFunctions = symbolFunctions; + this.filter = filter; + this.frameSymbolIndex = columnIndexes.getQuick(columnIndex); + this.cursor = new LatestByValueListRecordCursor(columnIndex, filter, columnIndexes, configuration.getDefaultSymbolCapacity(), symbolFunctions != null); + } + + public LatestByDeferredListValuesFilteredRecordCursorFactory( + @NotNull CairoConfiguration configuration, + RecordMetadata metadata, + DataFrameCursorFactory dataFrameCursorFactory, + int latestByIndex, + Function filter, + IntList columnIndexes + ) { + this(configuration, metadata, dataFrameCursorFactory, latestByIndex, null, filter, columnIndexes); + } + + @Override + public void close() { + super.close(); + if (filter != null) { + filter.close(); + } + this.cursor.destroy(); + } + + @Override + public boolean recordCursorSupportsRandomAccess() { + return true; + } + + @Override + protected RecordCursor getCursorInstance( + DataFrameCursor dataFrameCursor, + SqlExecutionContext executionContext + ) throws SqlException { + lookupDeferredSymbol(dataFrameCursor, executionContext); + cursor.of(dataFrameCursor, executionContext); + return cursor; + } + + private void lookupDeferredSymbol(DataFrameCursor dataFrameCursor, SqlExecutionContext executionContext) throws SqlException { + // If symbol values are restricted by a list in the qyert by syntax + // sym in ('val1', 'val2', 'val3') + // or similar we need to resolve string values into int symbol keys to search the table faster. + // Resolve values to int keys and save them in cursor.getSymbolKeys() set. + if (symbolFunctions != null) { + // Re-evaluate symbol key resolution if not all known already + IntHashSet symbolKeys = cursor.getSymbolKeys(); + if (symbolKeys.size() < symbolFunctions.size()) { + symbolKeys.clear(); + StaticSymbolTable symbolMapReader = dataFrameCursor.getSymbolTable(frameSymbolIndex); + for (int i = 0, n = symbolFunctions.size(); i < n; i++) { + Function symbolFunc = symbolFunctions.getQuick(i); + symbolFunc.init(dataFrameCursor, executionContext); + int key = symbolMapReader.keyOf(symbolFunc.getStr(null)); + if (key != SymbolTable.VALUE_NOT_FOUND) { + symbolKeys.add(key); + } + } + } + } + } +} diff --git a/core/src/main/java/io/questdb/griffin/engine/table/LatestByValueListRecordCursor.java b/core/src/main/java/io/questdb/griffin/engine/table/LatestByValueListRecordCursor.java new file mode 100644 index 0000000000000000000000000000000000000000..1a83c6fd935ffe5b39bc4de8d1e2599c3f7dd1db --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/table/LatestByValueListRecordCursor.java @@ -0,0 +1,240 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.table; + +import io.questdb.cairo.sql.DataFrame; +import io.questdb.cairo.sql.DataFrameCursor; +import io.questdb.cairo.sql.Function; +import io.questdb.cairo.sql.StaticSymbolTable; +import io.questdb.griffin.SqlException; +import io.questdb.griffin.SqlExecutionContext; +import io.questdb.std.*; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +class LatestByValueListRecordCursor extends AbstractDataFrameRecordCursor { + + private final int shrinkToCapacity; + private final int columnIndex; + private final Function filter; + private IntHashSet foundKeys; + private IntHashSet symbolKeys; + private final boolean restrictedByValues; + private DirectLongList rowIds; + private int currentRow; + + public LatestByValueListRecordCursor(int columnIndex, @Nullable Function filter, @NotNull IntList columnIndexes, int shrinkToCapacity, boolean restrictedByValues) { + super(columnIndexes); + this.shrinkToCapacity = shrinkToCapacity; + this.columnIndex = columnIndex; + this.filter = filter; + this.restrictedByValues = restrictedByValues; + if (restrictedByValues) { + this.symbolKeys = new IntHashSet(shrinkToCapacity); + } + this.foundKeys = new IntHashSet(shrinkToCapacity); + this.rowIds = new DirectLongList(shrinkToCapacity, MemoryTag.NATIVE_LONG_LIST); + } + + @Override + public void close() { + super.close(); + if (rowIds.size() > shrinkToCapacity) { + rowIds = Misc.free(rowIds); + rowIds = new DirectLongList(shrinkToCapacity, MemoryTag.NATIVE_LONG_LIST); + foundKeys = new IntHashSet(shrinkToCapacity); + // symbolKeys is unlikely to take too much memory + // because every value is associated with a value from `in (...)` WHERE filter and + // the list of parsed functions is of bigger size than symbolKeys hash set. + } + } + + @Override + void of(DataFrameCursor dataFrameCursor, SqlExecutionContext executionContext) throws SqlException { + this.dataFrameCursor = dataFrameCursor; + this.recordA.of(dataFrameCursor.getTableReader()); + this.recordB.of(dataFrameCursor.getTableReader()); + dataFrameCursor.toTop(); + foundKeys.clear(); + rowIds.clear(); + + // Find all record IDs and save in rowIds in descending order + // return then row by row in ascending timestamp order + // since most of the time factory is supposed to return in ASC timestamp order + // It can be optimised later on to not buffer row IDs and return in desc order. + if (restrictedByValues) { + if (symbolKeys.size() > 0) { + // Find only restricted set of symbol keys + rowIds.extend(symbolKeys.size()); + if (filter != null) { + filter.init(this, executionContext); + filter.toTop(); + findRestrictedWithFilter(filter, symbolKeys); + } else { + findRestrictedNoFilter(symbolKeys); + } + } + } else { + // Find latest by all distinct symbol values + StaticSymbolTable symbolTable = dataFrameCursor.getSymbolTable(columnIndexes.getQuick(columnIndex)); + int distinctSymbols = symbolTable.getSymbolCount(); + if (symbolTable.containsNullValue()) { + distinctSymbols++; + } + + rowIds.extend(distinctSymbols); + if (distinctSymbols > 0) { + if (filter != null) { + filter.init(this, executionContext); + filter.toTop(); + findAllWithFilter(filter, distinctSymbols); + } else { + findAllNoFilter(distinctSymbols); + } + } + } + toTop(); + } + + IntHashSet getSymbolKeys() { + return symbolKeys; + } + + public void destroy() { + // After close() the instance is designed to be re-usable. + // Destroy makes it non-reusable + rowIds = Misc.free(rowIds); + } + + @Override + public long size() { + return rowIds.size(); + } + + @Override + public boolean hasNext() { + if (currentRow-- > 0) { + long rowId = rowIds.get(currentRow); + recordAt(recordA, rowId); + return true; + } + return false; + } + + private void findAllNoFilter(int distinctCount) { + DataFrame frame = dataFrameCursor.next(); + int foundSize = 0; + while (frame != null) { + long rowLo = frame.getRowLo(); + long row = frame.getRowHi(); + recordA.jumpTo(frame.getPartitionIndex(), 0); + + while (row-- > rowLo) { + recordA.setRecordIndex(row); + int key = recordA.getInt(columnIndex); + if (foundKeys.add(key)) { + rowIds.add(Rows.toRowID(frame.getPartitionIndex(), row)); + if (++foundSize == distinctCount) { + return; + } + } + } + frame = dataFrameCursor.next(); + } + } + + private void findAllWithFilter(Function filter, int distinctCount) { + DataFrame frame = dataFrameCursor.next(); + int foundSize = 0; + while (frame != null) { + long rowLo = frame.getRowLo(); + long row = frame.getRowHi(); + recordA.jumpTo(frame.getPartitionIndex(), 0); + + while (row-- > rowLo) { + recordA.setRecordIndex(row); + int key = recordA.getInt(columnIndex); + if (filter.getBool(recordA) && foundKeys.add(key)) { + rowIds.add(Rows.toRowID(frame.getPartitionIndex(), row)); + if (++foundSize == distinctCount) { + return; + } + } + } + frame = dataFrameCursor.next(); + } + } + + private void findRestrictedNoFilter(IntHashSet symbolKeys) { + DataFrame frame = dataFrameCursor.next(); + int searchSize = symbolKeys.size(); + int foundSize = 0; + while (frame != null) { + long rowLo = frame.getRowLo(); + long row = frame.getRowHi(); + recordA.jumpTo(frame.getPartitionIndex(), 0); + + while (row-- > rowLo) { + recordA.setRecordIndex(row); + int key = recordA.getInt(columnIndex); + if (symbolKeys.contains(key) && foundKeys.add(key)) { + rowIds.add(Rows.toRowID(frame.getPartitionIndex(), row)); + if (++foundSize == searchSize) { + return; + } + } + } + frame = dataFrameCursor.next(); + } + } + + private void findRestrictedWithFilter(Function filter, IntHashSet symbolKeys) { + DataFrame frame = dataFrameCursor.next(); + int searchSize = symbolKeys.size(); + int foundSize = 0; + while (frame != null) { + long rowLo = frame.getRowLo(); + long row = frame.getRowHi(); + recordA.jumpTo(frame.getPartitionIndex(), 0); + + while (row-- > rowLo) { + recordA.setRecordIndex(row); + int key = recordA.getInt(columnIndex); + if (filter.getBool(recordA) && symbolKeys.contains(key) && foundKeys.add(key)) { + rowIds.add(Rows.toRowID(frame.getPartitionIndex(), row)); + if (++foundSize == searchSize) { + return; + } + } + } + frame = dataFrameCursor.next(); + } + } + + @Override + public void toTop() { + currentRow = (int) rowIds.size(); + } +} diff --git a/core/src/main/java/io/questdb/std/IntHashSet.java b/core/src/main/java/io/questdb/std/IntHashSet.java index 7cc2e9b8458a9d8a1555fd0c27f79cf216b7ca9b..393fe5de1dbcb5bb32149aec3e12d352e8d12645 100644 --- a/core/src/main/java/io/questdb/std/IntHashSet.java +++ b/core/src/main/java/io/questdb/std/IntHashSet.java @@ -84,8 +84,7 @@ public class IntHashSet extends AbstractIntHashSet { public final void clear() { free = capacity; - Arrays.fill(keys, noEntryKeyValue - ); + Arrays.fill(keys, noEntryKeyValue); list.clear(); } diff --git a/core/src/test/java/io/questdb/griffin/LatestByTest.java b/core/src/test/java/io/questdb/griffin/LatestByTest.java new file mode 100644 index 0000000000000000000000000000000000000000..851eb8e2b8afc807fd52817e9741ea6241ea4c88 --- /dev/null +++ b/core/src/test/java/io/questdb/griffin/LatestByTest.java @@ -0,0 +1,381 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin; + +import io.questdb.cairo.sql.Record; +import io.questdb.cairo.sql.RecordCursor; +import io.questdb.cairo.sql.RecordCursorFactory; +import io.questdb.std.Chars; +import io.questdb.std.Files; +import io.questdb.std.FilesFacadeImpl; +import io.questdb.std.str.LPSZ; +import io.questdb.std.str.StringSink; +import org.junit.Test; + +public class LatestByTest extends AbstractGriffinTest { + @Test + public void testLatestByDoesNotNeedFullScan() throws Exception { + assertMemoryLeak(() -> { + ff = new FilesFacadeImpl() { + @Override + public long openRO(LPSZ name) { + // Query should not scan the first partition + // all the latest values are in the second, third partition + if (Chars.contains(name, "1970-01-01")) { + return -1; + } + return Files.openRO(name); + } + }; + compile("create table t as (" + + "select rnd_symbol('a', 'b') s, timestamp_sequence(0, 60*60*1000*1000L) ts from long_sequence(49)" + + ") timestamp(ts) Partition by DAY"); + + assertQuery("s\tts\n" + + "a\t1970-01-02T23:00:00.000000Z\n" + + "b\t1970-01-03T00:00:00.000000Z\n", + "t " + + "where s in ('a', 'b') " + + "latest on ts partition by s", + "ts", + true, + true); + }); + } + + @Test + public void testLatestBySymbolEmpty() throws Exception { + assertMemoryLeak(() -> { + ff = new FilesFacadeImpl() { + @Override + public long openRO(LPSZ name) { + // Query should not scan any partition, searched symbol values don't exist in symbol table + if (Chars.contains(name, "1970-01-01") || Chars.contains(name, "1970-01-02")) { + return -1; + } + return Files.openRO(name); + } + }; + + compile("create table t as (" + + "select " + + "x, " + + "rnd_symbol('g', 'd', 'f') s, " + + "timestamp_sequence(0, 60*60*1000*1000L) ts " + + "from long_sequence(40)" + + ") timestamp(ts) Partition by DAY"); + + assertQuery("x\ts\tts\n", + "t where s in ('a', 'b') latest on ts partition by s", + "ts", + true, + true); + }); + } + + @Test + public void testLatestBySymbolManyDistinctValues() throws Exception { + assertMemoryLeak(() -> { + ff = new FilesFacadeImpl() { + @Override + public long openRO(LPSZ name) { + // Query should not scan the first partition + // all the latest values are in other partitions + if (Chars.contains(name, "1970-01-01")) { + return -1; + } + return Files.openRO(name); + } + }; + + compile("create table t as (" + + "select " + + "x, " + + "rnd_symbol(10000, 1, 15, 1000) s, " + + "timestamp_sequence(0, 1000*1000L) ts " + + "from long_sequence(1000000)" + + ") timestamp(ts) Partition by DAY"); + + assertQuery("min\tmax\n" + + "1970-01-11T15:33:16.000000Z\t1970-01-12T13:46:39.000000Z\n", + "select min(ts), max(ts) from (select ts, x, s from t latest on ts partition by s)", + null, + false, + true); + + assertQuery("min\tmax\n" + + "1970-01-11T20:55:39.000000Z\t1970-01-12T13:46:34.000000Z\n", + "select min(ts), max(ts) from (" + + "select ts, x, s " + + "from t " + + "where s in (" + selectDistinctSym("t", 500, "s") + ") " + + "latest on ts partition by s" + + ")", + null, + false, + true); + }); + } + + @Test + public void testLatestBySymbolUnfilteredDoesNotDoFullScan() throws Exception { + assertMemoryLeak(() -> { + ff = new FilesFacadeImpl() { + @Override + public long openRO(LPSZ name) { + // Query should not scan the first partition + // all the latest values are in the second, third partition + if (Chars.contains(name, "1970-01-01")) { + return -1; + } + return Files.openRO(name); + } + }; + + compile("create table t as (" + + "select " + + "x, " + + "rnd_symbol('a', 'b', null) s, " + + "timestamp_sequence(0, 60*60*1000*1000L) ts " + + "from long_sequence(49)" + + ") timestamp(ts) Partition by DAY"); + + assertQuery("ts\tx\ts\n" + + "1970-01-02T22:00:00.000000Z\t47\tb\n" + + "1970-01-02T23:00:00.000000Z\t48\ta\n" + + "1970-01-03T00:00:00.000000Z\t49\t\n", + "select ts, x, s from t latest on ts partition by s", + "ts", + true, + true); + }); + } + + @Test + public void testLatestBySymbolWithNoNulls() throws Exception { + assertMemoryLeak(() -> { + ff = new FilesFacadeImpl() { + @Override + public long openRO(LPSZ name) { + // Query should not scan the first partition + // all the latest values are in the second, third partition + if (Chars.contains(name, "1970-01-01")) { + return -1; + } + return Files.openRO(name); + } + }; + + compile("create table t as (" + + "select " + + "x, " + + "rnd_symbol('a', 'b', 'c', 'd', 'e', 'f') s, " + + "timestamp_sequence(0, 60*60*1000*1000L) ts " + + "from long_sequence(49)" + + ") timestamp(ts) Partition by DAY"); + + assertQuery("ts\tx\ts\n" + + "1970-01-02T17:00:00.000000Z\t42\td\n" + + "1970-01-02T19:00:00.000000Z\t44\te\n" + + "1970-01-02T21:00:00.000000Z\t46\tc\n" + + "1970-01-02T22:00:00.000000Z\t47\tb\n" + + "1970-01-02T23:00:00.000000Z\t48\ta\n" + + "1970-01-03T00:00:00.000000Z\t49\tf\n", + "select ts, x, s from t latest on ts partition by s", + "ts", + true, + true); + + assertQuery("ts\tx\ts\n" + + "1970-01-03T00:00:00.000000Z\t49\tf\n" + + "1970-01-02T19:00:00.000000Z\t44\te\n" + + "1970-01-02T17:00:00.000000Z\t42\td\n" + + "1970-01-02T21:00:00.000000Z\t46\tc\n" + + "1970-01-02T22:00:00.000000Z\t47\tb\n" + + "1970-01-02T23:00:00.000000Z\t48\ta\n", + "select ts, x, s from t latest on ts partition by s order by s desc", + null, + true, + true); + }); + } + + @Test + public void testLatestWithFilterByDoesNotNeedFullScan() throws Exception { + assertMemoryLeak(() -> { + ff = new FilesFacadeImpl() { + @Override + public long openRO(LPSZ name) { + // Query should not scan the first partition + // all the latest values are in the second, third partition + if (Chars.contains(name, "1970-01-01")) { + return -1; + } + return Files.openRO(name); + } + }; + + compile("create table t as (" + + "select " + + "x, " + + "rnd_symbol('a', 'b', null) s, " + + "timestamp_sequence(0, 60*60*1000*1000L) ts " + + "from long_sequence(49)" + + ") timestamp(ts) Partition by DAY"); + + assertQuery("x\ts\tts\n" + + "44\tb\t1970-01-02T19:00:00.000000Z\n" + + "48\ta\t1970-01-02T23:00:00.000000Z\n", + "t " + + "where s in ('a', 'b') and x%2 = 0 " + + "latest on ts partition by s", + "ts", + true, + true); + }); + } + + @Test + public void testLatestWithFilterByDoesNotNeedFullScanValueNotInSymbolTable() throws Exception { + ff = new FilesFacadeImpl() { + @Override + public long openRO(LPSZ name) { + // Query should not scan the first partition + // all the latest values are in the second, third partition + if (Chars.contains(name, "1970-01-01")) { + return -1; + } + return Files.openRO(name); + } + }; + + assertQuery("x\ts\tts\n" + + "44\tb\t1970-01-02T19:00:00.000000Z\n" + + "48\ta\t1970-01-02T23:00:00.000000Z\n", + "t " + + "where s in ('a', 'b', 'c') and x%2 = 0 " + + "latest on ts partition by s", + "create table t as (" + + "select " + + "x, " + + "rnd_symbol('a', 'b', null) s, " + + "timestamp_sequence(0, 60*60*1000*1000L) ts " + + "from long_sequence(49)" + + ") timestamp(ts) Partition by DAY", + "ts", + "insert into t values (1000, 'c', '1970-01-02T20:00')", + "x\ts\tts\n" + + "44\tb\t1970-01-02T19:00:00.000000Z\n" + + "1000\tc\t1970-01-02T20:00:00.000000Z\n" + + "48\ta\t1970-01-02T23:00:00.000000Z\n", + true, + true, + true); + } + + @Test + public void testLatestWithNullInSymbolFilterDoesNotDoFullScan() throws Exception { + assertMemoryLeak(() -> { + ff = new FilesFacadeImpl() { + @Override + public long openRO(LPSZ name) { + // Query should not scan the first partition + // all the latest values are in the second, third partition + if (Chars.contains(name, "1970-01-01")) { + return -1; + } + return Files.openRO(name); + } + }; + + compile("create table t as (" + + "select " + + "x, " + + "rnd_symbol('a', 'b', null) s, " + + "timestamp_sequence(0, 60*60*1000*1000L) ts " + + "from long_sequence(49)" + + ") timestamp(ts) Partition by DAY"); + + assertQuery("x\ts\tts\n" + + "48\ta\t1970-01-02T23:00:00.000000Z\n" + + "49\t\t1970-01-03T00:00:00.000000Z\n", + "t where s in ('a', null) latest on ts partition by s", + "ts", + true, + true); + }); + } + + @Test + public void testLatestWithoutSymbolFilterDoesNotDoFullScan() throws Exception { + assertMemoryLeak(() -> { + ff = new FilesFacadeImpl() { + @Override + public long openRO(LPSZ name) { + // Query should not scan the first partition + // all the latest values are in the second, third partition + if (Chars.contains(name, "1970-01-01")) { + return -1; + } + return Files.openRO(name); + } + }; + + compile("create table t as (" + + "select " + + "x, " + + "rnd_symbol('a', 'b', null) s, " + + "timestamp_sequence(0, 60*60*1000*1000L) ts " + + "from long_sequence(49)" + + ") timestamp(ts) Partition by DAY"); + + assertQuery("x\ts\tts\n" + + "35\ta\t1970-01-02T10:00:00.000000Z\n" + + "47\tb\t1970-01-02T22:00:00.000000Z\n" + + "49\t\t1970-01-03T00:00:00.000000Z\n", + "t where x%2 = 1 latest on ts partition by s", + "ts", + true, + true); + }); + } + + private String selectDistinctSym(String table, int count, String columnName) throws SqlException { + StringSink sink = new StringSink(); + try (RecordCursorFactory factory = compiler.compile("select distinct " + columnName + " from " + table + " limit " + count, sqlExecutionContext).getRecordCursorFactory()) { + try (RecordCursor cursor = factory.getCursor(sqlExecutionContext)) { + final Record record = cursor.getRecord(); + int i = 0; + while (cursor.hasNext()) { + if (i++ > 0) { + sink.put(','); + } + sink.put('\'').put(record.getSym(0)).put('\''); + } + } + } + return sink.toString(); + } +} diff --git a/core/src/test/java/io/questdb/test/tools/TestUtils.java b/core/src/test/java/io/questdb/test/tools/TestUtils.java index e360adaa76f53f2032bd427b097636cdcbbc8a29..a22c92b8c35bfbff871778e75bff55923128ad3b 100644 --- a/core/src/test/java/io/questdb/test/tools/TestUtils.java +++ b/core/src/test/java/io/questdb/test/tools/TestUtils.java @@ -407,7 +407,7 @@ public final class TestUtils { runnable.run(); Path.clearThreadLocals(); if (fileCount != Files.getOpenFileCount()) { - Assert.assertEquals(Files.getOpenFdDebugInfo(), fileCount, Files.getOpenFileCount()); + Assert.assertEquals("file descriptors " + Files.getOpenFdDebugInfo(), fileCount, Files.getOpenFileCount()); } // Checks that the same tag used for allocation and freeing native memory