未验证 提交 af41e92d 编写于 作者: A Alex Pelagenko 提交者: GitHub

feat(sql): optimise latest by symbol column with no index (#2010)

Optimises SQL queries similar to

```
tbl 
LATEST on timestamp PARTITION BY sym
```

when symbol column `sym` is not indexed. Uses the fact that there are known numbers of searched symbol values and stops scan when it finds the latest values for all the distinct values.
上级 e82f7104
......@@ -1203,13 +1203,12 @@ public class SqlCodeGenerator implements Mutable, Closeable {
final SymbolMapReader symbolMapReader = reader.getSymbolMapReader(columnIndexes.getQuick(latestByIndex));
if (nKeyValues > 1) {
return new LatestByValuesFilteredRecordCursorFactory(
return new LatestByDeferredListValuesFilteredRecordCursorFactory(
configuration,
metadata,
dataFrameCursorFactory,
latestByIndex,
intrinsicModel.keyValueFuncs,
symbolMapReader,
filter,
columnIndexes
);
......@@ -1255,12 +1254,11 @@ public class SqlCodeGenerator implements Mutable, Closeable {
prefixes
);
} else {
return new LatestByAllFilteredRecordCursorFactory(
metadata,
return new LatestByDeferredListValuesFilteredRecordCursorFactory(
configuration,
metadata,
dataFrameCursorFactory,
RecordSinkFactory.getInstance(asm, metadata, listColumnFilterA, false),
keyTypes,
latestByIndex,
filter,
columnIndexes
);
......@@ -3022,15 +3020,31 @@ public class SqlCodeGenerator implements Mutable, Closeable {
model.getLatestBy().clear();
// listColumnFilterA = latest by column indexes
if (latestByColumnCount == 1 && myMeta.isColumnIndexed(listColumnFilterA.getColumnIndexFactored(0))) {
return new LatestByAllIndexedRecordCursorFactory(
myMeta,
configuration,
new FullBwdDataFrameCursorFactory(engine, tableName, model.getTableId(), model.getTableVersion()),
listColumnFilterA.getColumnIndexFactored(0),
columnIndexes,
prefixes
);
if (latestByColumnCount == 1) {
int latestByColumnIndex = listColumnFilterA.getColumnIndexFactored(0);
if (myMeta.isColumnIndexed(latestByColumnIndex)) {
return new LatestByAllIndexedRecordCursorFactory(
myMeta,
configuration,
new FullBwdDataFrameCursorFactory(engine, tableName, model.getTableId(), model.getTableVersion()),
listColumnFilterA.getColumnIndexFactored(0),
columnIndexes,
prefixes
);
}
if (ColumnType.isSymbol(myMeta.getColumnType(latestByColumnIndex))
&& myMeta.isSymbolTableStatic(latestByColumnIndex)) {
// we have "latest by" symbol column values, but no index
return new LatestByDeferredListValuesFilteredRecordCursorFactory(
configuration,
myMeta,
new FullBwdDataFrameCursorFactory(engine, tableName, model.getTableId(), model.getTableVersion()),
latestByColumnIndex,
null,
columnIndexes
);
}
}
return new LatestByAllFilteredRecordCursorFactory(
......
......@@ -58,7 +58,7 @@ public class SymbolColumn extends SymbolFunction implements ScalarFunction {
@Override
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
this.symbolTable = symbolTableSource.getSymbolTable(columnIndex);
//static symbol table must be non-null
// static symbol table must be non-null
assert !symbolTableStatic || symbolTable != null;
}
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin.engine.table;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.sql.*;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.std.IntHashSet;
import io.questdb.std.IntList;
import io.questdb.std.ObjList;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Iterates table backwards and finds latest values by single symbol column.
* When the LATEST BY is applied to symbol column QuestDB knows all distinct symbol values
* and in many cases can stop before scanning all the data when it finds all the expected values
*/
public class LatestByDeferredListValuesFilteredRecordCursorFactory extends AbstractDataFrameRecordCursorFactory {
private final ObjList<Function> symbolFunctions;
private final Function filter;
private final LatestByValueListRecordCursor cursor;
private final int frameSymbolIndex;
public LatestByDeferredListValuesFilteredRecordCursorFactory(
@NotNull CairoConfiguration configuration,
@NotNull RecordMetadata metadata,
@NotNull DataFrameCursorFactory dataFrameCursorFactory,
int columnIndex,
@Nullable ObjList<Function> symbolFunctions,
@Nullable Function filter,
@NotNull IntList columnIndexes
) {
super(metadata, dataFrameCursorFactory);
this.symbolFunctions = symbolFunctions;
this.filter = filter;
this.frameSymbolIndex = columnIndexes.getQuick(columnIndex);
this.cursor = new LatestByValueListRecordCursor(columnIndex, filter, columnIndexes, configuration.getDefaultSymbolCapacity(), symbolFunctions != null);
}
public LatestByDeferredListValuesFilteredRecordCursorFactory(
@NotNull CairoConfiguration configuration,
RecordMetadata metadata,
DataFrameCursorFactory dataFrameCursorFactory,
int latestByIndex,
Function filter,
IntList columnIndexes
) {
this(configuration, metadata, dataFrameCursorFactory, latestByIndex, null, filter, columnIndexes);
}
@Override
public void close() {
super.close();
if (filter != null) {
filter.close();
}
this.cursor.destroy();
}
@Override
public boolean recordCursorSupportsRandomAccess() {
return true;
}
@Override
protected RecordCursor getCursorInstance(
DataFrameCursor dataFrameCursor,
SqlExecutionContext executionContext
) throws SqlException {
lookupDeferredSymbol(dataFrameCursor, executionContext);
cursor.of(dataFrameCursor, executionContext);
return cursor;
}
private void lookupDeferredSymbol(DataFrameCursor dataFrameCursor, SqlExecutionContext executionContext) throws SqlException {
// If symbol values are restricted by a list in the qyert by syntax
// sym in ('val1', 'val2', 'val3')
// or similar we need to resolve string values into int symbol keys to search the table faster.
// Resolve values to int keys and save them in cursor.getSymbolKeys() set.
if (symbolFunctions != null) {
// Re-evaluate symbol key resolution if not all known already
IntHashSet symbolKeys = cursor.getSymbolKeys();
if (symbolKeys.size() < symbolFunctions.size()) {
symbolKeys.clear();
StaticSymbolTable symbolMapReader = dataFrameCursor.getSymbolTable(frameSymbolIndex);
for (int i = 0, n = symbolFunctions.size(); i < n; i++) {
Function symbolFunc = symbolFunctions.getQuick(i);
symbolFunc.init(dataFrameCursor, executionContext);
int key = symbolMapReader.keyOf(symbolFunc.getStr(null));
if (key != SymbolTable.VALUE_NOT_FOUND) {
symbolKeys.add(key);
}
}
}
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin.engine.table;
import io.questdb.cairo.sql.DataFrame;
import io.questdb.cairo.sql.DataFrameCursor;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.StaticSymbolTable;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.std.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
class LatestByValueListRecordCursor extends AbstractDataFrameRecordCursor {
private final int shrinkToCapacity;
private final int columnIndex;
private final Function filter;
private IntHashSet foundKeys;
private IntHashSet symbolKeys;
private final boolean restrictedByValues;
private DirectLongList rowIds;
private int currentRow;
public LatestByValueListRecordCursor(int columnIndex, @Nullable Function filter, @NotNull IntList columnIndexes, int shrinkToCapacity, boolean restrictedByValues) {
super(columnIndexes);
this.shrinkToCapacity = shrinkToCapacity;
this.columnIndex = columnIndex;
this.filter = filter;
this.restrictedByValues = restrictedByValues;
if (restrictedByValues) {
this.symbolKeys = new IntHashSet(shrinkToCapacity);
}
this.foundKeys = new IntHashSet(shrinkToCapacity);
this.rowIds = new DirectLongList(shrinkToCapacity, MemoryTag.NATIVE_LONG_LIST);
}
@Override
public void close() {
super.close();
if (rowIds.size() > shrinkToCapacity) {
rowIds = Misc.free(rowIds);
rowIds = new DirectLongList(shrinkToCapacity, MemoryTag.NATIVE_LONG_LIST);
foundKeys = new IntHashSet(shrinkToCapacity);
// symbolKeys is unlikely to take too much memory
// because every value is associated with a value from `in (...)` WHERE filter and
// the list of parsed functions is of bigger size than symbolKeys hash set.
}
}
@Override
void of(DataFrameCursor dataFrameCursor, SqlExecutionContext executionContext) throws SqlException {
this.dataFrameCursor = dataFrameCursor;
this.recordA.of(dataFrameCursor.getTableReader());
this.recordB.of(dataFrameCursor.getTableReader());
dataFrameCursor.toTop();
foundKeys.clear();
rowIds.clear();
// Find all record IDs and save in rowIds in descending order
// return then row by row in ascending timestamp order
// since most of the time factory is supposed to return in ASC timestamp order
// It can be optimised later on to not buffer row IDs and return in desc order.
if (restrictedByValues) {
if (symbolKeys.size() > 0) {
// Find only restricted set of symbol keys
rowIds.extend(symbolKeys.size());
if (filter != null) {
filter.init(this, executionContext);
filter.toTop();
findRestrictedWithFilter(filter, symbolKeys);
} else {
findRestrictedNoFilter(symbolKeys);
}
}
} else {
// Find latest by all distinct symbol values
StaticSymbolTable symbolTable = dataFrameCursor.getSymbolTable(columnIndexes.getQuick(columnIndex));
int distinctSymbols = symbolTable.getSymbolCount();
if (symbolTable.containsNullValue()) {
distinctSymbols++;
}
rowIds.extend(distinctSymbols);
if (distinctSymbols > 0) {
if (filter != null) {
filter.init(this, executionContext);
filter.toTop();
findAllWithFilter(filter, distinctSymbols);
} else {
findAllNoFilter(distinctSymbols);
}
}
}
toTop();
}
IntHashSet getSymbolKeys() {
return symbolKeys;
}
public void destroy() {
// After close() the instance is designed to be re-usable.
// Destroy makes it non-reusable
rowIds = Misc.free(rowIds);
}
@Override
public long size() {
return rowIds.size();
}
@Override
public boolean hasNext() {
if (currentRow-- > 0) {
long rowId = rowIds.get(currentRow);
recordAt(recordA, rowId);
return true;
}
return false;
}
private void findAllNoFilter(int distinctCount) {
DataFrame frame = dataFrameCursor.next();
int foundSize = 0;
while (frame != null) {
long rowLo = frame.getRowLo();
long row = frame.getRowHi();
recordA.jumpTo(frame.getPartitionIndex(), 0);
while (row-- > rowLo) {
recordA.setRecordIndex(row);
int key = recordA.getInt(columnIndex);
if (foundKeys.add(key)) {
rowIds.add(Rows.toRowID(frame.getPartitionIndex(), row));
if (++foundSize == distinctCount) {
return;
}
}
}
frame = dataFrameCursor.next();
}
}
private void findAllWithFilter(Function filter, int distinctCount) {
DataFrame frame = dataFrameCursor.next();
int foundSize = 0;
while (frame != null) {
long rowLo = frame.getRowLo();
long row = frame.getRowHi();
recordA.jumpTo(frame.getPartitionIndex(), 0);
while (row-- > rowLo) {
recordA.setRecordIndex(row);
int key = recordA.getInt(columnIndex);
if (filter.getBool(recordA) && foundKeys.add(key)) {
rowIds.add(Rows.toRowID(frame.getPartitionIndex(), row));
if (++foundSize == distinctCount) {
return;
}
}
}
frame = dataFrameCursor.next();
}
}
private void findRestrictedNoFilter(IntHashSet symbolKeys) {
DataFrame frame = dataFrameCursor.next();
int searchSize = symbolKeys.size();
int foundSize = 0;
while (frame != null) {
long rowLo = frame.getRowLo();
long row = frame.getRowHi();
recordA.jumpTo(frame.getPartitionIndex(), 0);
while (row-- > rowLo) {
recordA.setRecordIndex(row);
int key = recordA.getInt(columnIndex);
if (symbolKeys.contains(key) && foundKeys.add(key)) {
rowIds.add(Rows.toRowID(frame.getPartitionIndex(), row));
if (++foundSize == searchSize) {
return;
}
}
}
frame = dataFrameCursor.next();
}
}
private void findRestrictedWithFilter(Function filter, IntHashSet symbolKeys) {
DataFrame frame = dataFrameCursor.next();
int searchSize = symbolKeys.size();
int foundSize = 0;
while (frame != null) {
long rowLo = frame.getRowLo();
long row = frame.getRowHi();
recordA.jumpTo(frame.getPartitionIndex(), 0);
while (row-- > rowLo) {
recordA.setRecordIndex(row);
int key = recordA.getInt(columnIndex);
if (filter.getBool(recordA) && symbolKeys.contains(key) && foundKeys.add(key)) {
rowIds.add(Rows.toRowID(frame.getPartitionIndex(), row));
if (++foundSize == searchSize) {
return;
}
}
}
frame = dataFrameCursor.next();
}
}
@Override
public void toTop() {
currentRow = (int) rowIds.size();
}
}
......@@ -84,8 +84,7 @@ public class IntHashSet extends AbstractIntHashSet {
public final void clear() {
free = capacity;
Arrays.fill(keys, noEntryKeyValue
);
Arrays.fill(keys, noEntryKeyValue);
list.clear();
}
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.std.Chars;
import io.questdb.std.Files;
import io.questdb.std.FilesFacadeImpl;
import io.questdb.std.str.LPSZ;
import io.questdb.std.str.StringSink;
import org.junit.Test;
public class LatestByTest extends AbstractGriffinTest {
@Test
public void testLatestByDoesNotNeedFullScan() throws Exception {
assertMemoryLeak(() -> {
ff = new FilesFacadeImpl() {
@Override
public long openRO(LPSZ name) {
// Query should not scan the first partition
// all the latest values are in the second, third partition
if (Chars.contains(name, "1970-01-01")) {
return -1;
}
return Files.openRO(name);
}
};
compile("create table t as (" +
"select rnd_symbol('a', 'b') s, timestamp_sequence(0, 60*60*1000*1000L) ts from long_sequence(49)" +
") timestamp(ts) Partition by DAY");
assertQuery("s\tts\n" +
"a\t1970-01-02T23:00:00.000000Z\n" +
"b\t1970-01-03T00:00:00.000000Z\n",
"t " +
"where s in ('a', 'b') " +
"latest on ts partition by s",
"ts",
true,
true);
});
}
@Test
public void testLatestBySymbolEmpty() throws Exception {
assertMemoryLeak(() -> {
ff = new FilesFacadeImpl() {
@Override
public long openRO(LPSZ name) {
// Query should not scan any partition, searched symbol values don't exist in symbol table
if (Chars.contains(name, "1970-01-01") || Chars.contains(name, "1970-01-02")) {
return -1;
}
return Files.openRO(name);
}
};
compile("create table t as (" +
"select " +
"x, " +
"rnd_symbol('g', 'd', 'f') s, " +
"timestamp_sequence(0, 60*60*1000*1000L) ts " +
"from long_sequence(40)" +
") timestamp(ts) Partition by DAY");
assertQuery("x\ts\tts\n",
"t where s in ('a', 'b') latest on ts partition by s",
"ts",
true,
true);
});
}
@Test
public void testLatestBySymbolManyDistinctValues() throws Exception {
assertMemoryLeak(() -> {
ff = new FilesFacadeImpl() {
@Override
public long openRO(LPSZ name) {
// Query should not scan the first partition
// all the latest values are in other partitions
if (Chars.contains(name, "1970-01-01")) {
return -1;
}
return Files.openRO(name);
}
};
compile("create table t as (" +
"select " +
"x, " +
"rnd_symbol(10000, 1, 15, 1000) s, " +
"timestamp_sequence(0, 1000*1000L) ts " +
"from long_sequence(1000000)" +
") timestamp(ts) Partition by DAY");
assertQuery("min\tmax\n" +
"1970-01-11T15:33:16.000000Z\t1970-01-12T13:46:39.000000Z\n",
"select min(ts), max(ts) from (select ts, x, s from t latest on ts partition by s)",
null,
false,
true);
assertQuery("min\tmax\n" +
"1970-01-11T20:55:39.000000Z\t1970-01-12T13:46:34.000000Z\n",
"select min(ts), max(ts) from (" +
"select ts, x, s " +
"from t " +
"where s in (" + selectDistinctSym("t", 500, "s") + ") " +
"latest on ts partition by s" +
")",
null,
false,
true);
});
}
@Test
public void testLatestBySymbolUnfilteredDoesNotDoFullScan() throws Exception {
assertMemoryLeak(() -> {
ff = new FilesFacadeImpl() {
@Override
public long openRO(LPSZ name) {
// Query should not scan the first partition
// all the latest values are in the second, third partition
if (Chars.contains(name, "1970-01-01")) {
return -1;
}
return Files.openRO(name);
}
};
compile("create table t as (" +
"select " +
"x, " +
"rnd_symbol('a', 'b', null) s, " +
"timestamp_sequence(0, 60*60*1000*1000L) ts " +
"from long_sequence(49)" +
") timestamp(ts) Partition by DAY");
assertQuery("ts\tx\ts\n" +
"1970-01-02T22:00:00.000000Z\t47\tb\n" +
"1970-01-02T23:00:00.000000Z\t48\ta\n" +
"1970-01-03T00:00:00.000000Z\t49\t\n",
"select ts, x, s from t latest on ts partition by s",
"ts",
true,
true);
});
}
@Test
public void testLatestBySymbolWithNoNulls() throws Exception {
assertMemoryLeak(() -> {
ff = new FilesFacadeImpl() {
@Override
public long openRO(LPSZ name) {
// Query should not scan the first partition
// all the latest values are in the second, third partition
if (Chars.contains(name, "1970-01-01")) {
return -1;
}
return Files.openRO(name);
}
};
compile("create table t as (" +
"select " +
"x, " +
"rnd_symbol('a', 'b', 'c', 'd', 'e', 'f') s, " +
"timestamp_sequence(0, 60*60*1000*1000L) ts " +
"from long_sequence(49)" +
") timestamp(ts) Partition by DAY");
assertQuery("ts\tx\ts\n" +
"1970-01-02T17:00:00.000000Z\t42\td\n" +
"1970-01-02T19:00:00.000000Z\t44\te\n" +
"1970-01-02T21:00:00.000000Z\t46\tc\n" +
"1970-01-02T22:00:00.000000Z\t47\tb\n" +
"1970-01-02T23:00:00.000000Z\t48\ta\n" +
"1970-01-03T00:00:00.000000Z\t49\tf\n",
"select ts, x, s from t latest on ts partition by s",
"ts",
true,
true);
assertQuery("ts\tx\ts\n" +
"1970-01-03T00:00:00.000000Z\t49\tf\n" +
"1970-01-02T19:00:00.000000Z\t44\te\n" +
"1970-01-02T17:00:00.000000Z\t42\td\n" +
"1970-01-02T21:00:00.000000Z\t46\tc\n" +
"1970-01-02T22:00:00.000000Z\t47\tb\n" +
"1970-01-02T23:00:00.000000Z\t48\ta\n",
"select ts, x, s from t latest on ts partition by s order by s desc",
null,
true,
true);
});
}
@Test
public void testLatestWithFilterByDoesNotNeedFullScan() throws Exception {
assertMemoryLeak(() -> {
ff = new FilesFacadeImpl() {
@Override
public long openRO(LPSZ name) {
// Query should not scan the first partition
// all the latest values are in the second, third partition
if (Chars.contains(name, "1970-01-01")) {
return -1;
}
return Files.openRO(name);
}
};
compile("create table t as (" +
"select " +
"x, " +
"rnd_symbol('a', 'b', null) s, " +
"timestamp_sequence(0, 60*60*1000*1000L) ts " +
"from long_sequence(49)" +
") timestamp(ts) Partition by DAY");
assertQuery("x\ts\tts\n" +
"44\tb\t1970-01-02T19:00:00.000000Z\n" +
"48\ta\t1970-01-02T23:00:00.000000Z\n",
"t " +
"where s in ('a', 'b') and x%2 = 0 " +
"latest on ts partition by s",
"ts",
true,
true);
});
}
@Test
public void testLatestWithFilterByDoesNotNeedFullScanValueNotInSymbolTable() throws Exception {
ff = new FilesFacadeImpl() {
@Override
public long openRO(LPSZ name) {
// Query should not scan the first partition
// all the latest values are in the second, third partition
if (Chars.contains(name, "1970-01-01")) {
return -1;
}
return Files.openRO(name);
}
};
assertQuery("x\ts\tts\n" +
"44\tb\t1970-01-02T19:00:00.000000Z\n" +
"48\ta\t1970-01-02T23:00:00.000000Z\n",
"t " +
"where s in ('a', 'b', 'c') and x%2 = 0 " +
"latest on ts partition by s",
"create table t as (" +
"select " +
"x, " +
"rnd_symbol('a', 'b', null) s, " +
"timestamp_sequence(0, 60*60*1000*1000L) ts " +
"from long_sequence(49)" +
") timestamp(ts) Partition by DAY",
"ts",
"insert into t values (1000, 'c', '1970-01-02T20:00')",
"x\ts\tts\n" +
"44\tb\t1970-01-02T19:00:00.000000Z\n" +
"1000\tc\t1970-01-02T20:00:00.000000Z\n" +
"48\ta\t1970-01-02T23:00:00.000000Z\n",
true,
true,
true);
}
@Test
public void testLatestWithNullInSymbolFilterDoesNotDoFullScan() throws Exception {
assertMemoryLeak(() -> {
ff = new FilesFacadeImpl() {
@Override
public long openRO(LPSZ name) {
// Query should not scan the first partition
// all the latest values are in the second, third partition
if (Chars.contains(name, "1970-01-01")) {
return -1;
}
return Files.openRO(name);
}
};
compile("create table t as (" +
"select " +
"x, " +
"rnd_symbol('a', 'b', null) s, " +
"timestamp_sequence(0, 60*60*1000*1000L) ts " +
"from long_sequence(49)" +
") timestamp(ts) Partition by DAY");
assertQuery("x\ts\tts\n" +
"48\ta\t1970-01-02T23:00:00.000000Z\n" +
"49\t\t1970-01-03T00:00:00.000000Z\n",
"t where s in ('a', null) latest on ts partition by s",
"ts",
true,
true);
});
}
@Test
public void testLatestWithoutSymbolFilterDoesNotDoFullScan() throws Exception {
assertMemoryLeak(() -> {
ff = new FilesFacadeImpl() {
@Override
public long openRO(LPSZ name) {
// Query should not scan the first partition
// all the latest values are in the second, third partition
if (Chars.contains(name, "1970-01-01")) {
return -1;
}
return Files.openRO(name);
}
};
compile("create table t as (" +
"select " +
"x, " +
"rnd_symbol('a', 'b', null) s, " +
"timestamp_sequence(0, 60*60*1000*1000L) ts " +
"from long_sequence(49)" +
") timestamp(ts) Partition by DAY");
assertQuery("x\ts\tts\n" +
"35\ta\t1970-01-02T10:00:00.000000Z\n" +
"47\tb\t1970-01-02T22:00:00.000000Z\n" +
"49\t\t1970-01-03T00:00:00.000000Z\n",
"t where x%2 = 1 latest on ts partition by s",
"ts",
true,
true);
});
}
private String selectDistinctSym(String table, int count, String columnName) throws SqlException {
StringSink sink = new StringSink();
try (RecordCursorFactory factory = compiler.compile("select distinct " + columnName + " from " + table + " limit " + count, sqlExecutionContext).getRecordCursorFactory()) {
try (RecordCursor cursor = factory.getCursor(sqlExecutionContext)) {
final Record record = cursor.getRecord();
int i = 0;
while (cursor.hasNext()) {
if (i++ > 0) {
sink.put(',');
}
sink.put('\'').put(record.getSym(0)).put('\'');
}
}
}
return sink.toString();
}
}
......@@ -407,7 +407,7 @@ public final class TestUtils {
runnable.run();
Path.clearThreadLocals();
if (fileCount != Files.getOpenFileCount()) {
Assert.assertEquals(Files.getOpenFdDebugInfo(), fileCount, Files.getOpenFileCount());
Assert.assertEquals("file descriptors " + Files.getOpenFdDebugInfo(), fileCount, Files.getOpenFileCount());
}
// Checks that the same tag used for allocation and freeing native memory
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册