提交 dc0bb73f 编写于 作者: V Vlad Ilyushchenko

GRIFFIN: implementation of 'select count() from table' that would use result...

GRIFFIN: implementation of 'select count() from table' that would use result of base.size() when its available. This speed up exploratory 'count()' queries dramatically
上级 53dd15a9
......@@ -42,16 +42,10 @@ import io.questdb.griffin.model.*;
import io.questdb.std.*;
import org.jetbrains.annotations.NotNull;
import static io.questdb.griffin.model.ExpressionNode.FUNCTION;
public class SqlCodeGenerator {
private static final IntHashSet limitTypes = new IntHashSet();
static {
limitTypes.add(ColumnType.LONG);
limitTypes.add(ColumnType.BYTE);
limitTypes.add(ColumnType.SHORT);
limitTypes.add(ColumnType.INT);
}
private final WhereClauseParser filterAnalyser = new WhereClauseParser();
private final FunctionParser functionParser;
private final CairoEngine engine;
......@@ -67,7 +61,6 @@ public class SqlCodeGenerator {
private final ArrayColumnTypes valueTypes = new ArrayColumnTypes();
private final EntityColumnFilter entityColumnFilter = new EntityColumnFilter();
private boolean fullFatJoins = false;
public SqlCodeGenerator(
CairoEngine engine,
CairoConfiguration configuration,
......@@ -753,7 +746,7 @@ public class SqlCodeGenerator {
) throws SqlException {
ExpressionNode tableName = model.getTableName();
if (tableName != null) {
if (tableName.type == ExpressionNode.FUNCTION) {
if (tableName.type == FUNCTION) {
return generateFunctionQuery(model, executionContext);
} else {
return generateTableQuery(model, executionContext);
......@@ -879,90 +872,6 @@ public class SqlCodeGenerator {
return factory;
}
/**
* Generates chain of parent factories each of which takes only two argument factories.
* Parent factory will perform one of SET operations on its arguments, such as UNION, UNION ALL,
* INTERSECT or EXCEPT
*
* @param model incoming model is expected to have a chain of models via its QueryModel.getUnionModel() function
* @param masterFactory is compiled first argument
* @param executionContext execution context for authorization and parallel execution purposes
* @return factory that performs a SET operation
* @throws SqlException when query contains syntax errors
*/
private RecordCursorFactory generateSetFactory(
QueryModel model,
RecordCursorFactory masterFactory,
SqlExecutionContext executionContext
) throws SqlException {
RecordCursorFactory slaveFactory = generateQuery0(model.getUnionModel(), executionContext, true);
if (model.getUnionModelType() == QueryModel.UNION_MODEL_DISTINCT) {
return generateUnionFactory(model, masterFactory, executionContext, slaveFactory);
} else if (model.getUnionModelType() == QueryModel.UNION_MODEL_ALL) {
return generateUnionAllFactory(model, masterFactory, executionContext, slaveFactory);
}
assert false;
return null;
}
private RecordCursorFactory generateUnionFactory(QueryModel model, RecordCursorFactory masterFactory, SqlExecutionContext executionContext, RecordCursorFactory slaveFactory) throws SqlException {
final RecordMetadata metadata = masterFactory.getMetadata();
final RecordMetadata slaveMetadata = slaveFactory.getMetadata();
final int columnCount = metadata.getColumnCount();
for (int i = 0; i < columnCount; i++) {
if (metadata.getColumnType(i) != slaveMetadata.getColumnType(i)) {
throw SqlException
.$(model.getUnionModel().getModelPosition(), "column type mismatch [index=").put(i)
.put(", A=").put(ColumnType.nameOf(metadata.getColumnType(i)))
.put(", B=").put(ColumnType.nameOf(slaveMetadata.getColumnType(i)))
.put(']');
}
}
entityColumnFilter.of(columnCount);
final RecordSink recordSink = RecordSinkFactory.getInstance(asm, metadata, entityColumnFilter, true);
valueTypes.reset();
RecordCursorFactory unionFactory = new UnionRecordCursorFactory(
configuration,
metadata,
masterFactory,
slaveFactory,
recordSink,
valueTypes
);
if (model.getUnionModel().getUnionModel() != null) {
return generateSetFactory(model.getUnionModel(), unionFactory, executionContext);
}
return unionFactory;
}
private RecordCursorFactory generateUnionAllFactory(QueryModel model, RecordCursorFactory masterFactory, SqlExecutionContext executionContext, RecordCursorFactory slaveFactory) throws SqlException {
final RecordMetadata metadata = masterFactory.getMetadata();
final RecordMetadata slaveMetadata = slaveFactory.getMetadata();
final int columnCount = metadata.getColumnCount();
for (int i = 0; i < columnCount; i++) {
if (metadata.getColumnType(i) != slaveMetadata.getColumnType(i)) {
throw SqlException
.$(model.getUnionModel().getModelPosition(), "column type mismatch [index=").put(i)
.put(", A=").put(ColumnType.nameOf(metadata.getColumnType(i)))
.put(", B=").put(ColumnType.nameOf(slaveMetadata.getColumnType(i)))
.put(']');
}
}
final RecordCursorFactory unionAllFactory = new UnionAllRecordCursorFactory(metadata, masterFactory, slaveFactory);
if (model.getUnionModel().getUnionModel() != null) {
return generateSetFactory(model.getUnionModel(), unionAllFactory, executionContext);
}
return unionAllFactory;
}
private RecordCursorFactory generateQuery0(QueryModel model, SqlExecutionContext executionContext, boolean processJoins) throws SqlException {
return generateLimit(
generateOrderBy(
......@@ -1161,6 +1070,22 @@ public class SqlCodeGenerator {
return new SelectedRecordCursorFactory(selectMetadata, columnCrossIndex, factory);
}
private RecordCursorFactory generateSelectDistinct(QueryModel model, SqlExecutionContext executionContext) throws SqlException {
final RecordCursorFactory factory = generateSubQuery(model, executionContext);
try {
return new DistinctRecordCursorFactory(
configuration,
factory,
entityColumnFilter,
asm
);
} catch (CairoException e) {
factory.close();
throw e;
}
}
private RecordCursorFactory generateSelectGroupBy(QueryModel model, SqlExecutionContext executionContext) throws SqlException {
// fail fast if we cannot create timestamp sampler
......@@ -1172,6 +1097,22 @@ public class SqlCodeGenerator {
final RecordCursorFactory factory = generateSubQuery(model, executionContext);
try {
// generate special case plan for "select count() from somewhere"
ObjList<QueryColumn> columns = model.getColumns();
if (columns.size() == 1) {
QueryColumn column = columns.getQuick(0);
if (column.getAst().type == FUNCTION && Chars.equalsLowerCaseAscii(column.getAst().token, "count")) {
if (Chars.equalsLowerCaseAscii(column.getName(), "count")) {
return new CountRecordCursorFactory(CountRecordCursorFactory.DEFAULT_COUNT_METADATA, factory);
}
GenericRecordMetadata metadata = new GenericRecordMetadata();
metadata.add(new TableColumnMetadata(Chars.toString(column.getName()), ColumnType.LONG));
return new CountRecordCursorFactory(metadata, factory);
}
}
keyTypes.reset();
valueTypes.reset();
listColumnFilterA.clear();
......@@ -1194,22 +1135,6 @@ public class SqlCodeGenerator {
}
}
private RecordCursorFactory generateSelectDistinct(QueryModel model, SqlExecutionContext executionContext) throws SqlException {
final RecordCursorFactory factory = generateSubQuery(model, executionContext);
try {
return new DistinctRecordCursorFactory(
configuration,
factory,
entityColumnFilter,
asm
);
} catch (CairoException e) {
factory.close();
throw e;
}
}
private RecordCursorFactory generateSelectVirtual(QueryModel model, SqlExecutionContext executionContext) throws SqlException {
assert model.getNestedModel() != null;
final RecordCursorFactory factory = generateSubQuery(model, executionContext);
......@@ -1266,6 +1191,32 @@ public class SqlCodeGenerator {
}
}
/**
* Generates chain of parent factories each of which takes only two argument factories.
* Parent factory will perform one of SET operations on its arguments, such as UNION, UNION ALL,
* INTERSECT or EXCEPT
*
* @param model incoming model is expected to have a chain of models via its QueryModel.getUnionModel() function
* @param masterFactory is compiled first argument
* @param executionContext execution context for authorization and parallel execution purposes
* @return factory that performs a SET operation
* @throws SqlException when query contains syntax errors
*/
private RecordCursorFactory generateSetFactory(
QueryModel model,
RecordCursorFactory masterFactory,
SqlExecutionContext executionContext
) throws SqlException {
RecordCursorFactory slaveFactory = generateQuery0(model.getUnionModel(), executionContext, true);
if (model.getUnionModelType() == QueryModel.UNION_MODEL_DISTINCT) {
return generateUnionFactory(model, masterFactory, executionContext, slaveFactory);
} else if (model.getUnionModelType() == QueryModel.UNION_MODEL_ALL) {
return generateUnionAllFactory(model, masterFactory, executionContext, slaveFactory);
}
assert false;
return null;
}
private RecordCursorFactory generateSubQuery(QueryModel model, SqlExecutionContext executionContext) throws SqlException {
assert model.getNestedModel() != null;
return generateQuery(model.getNestedModel(), executionContext, true);
......@@ -1446,6 +1397,64 @@ public class SqlCodeGenerator {
}
}
private RecordCursorFactory generateUnionAllFactory(QueryModel model, RecordCursorFactory masterFactory, SqlExecutionContext executionContext, RecordCursorFactory slaveFactory) throws SqlException {
final RecordMetadata metadata = masterFactory.getMetadata();
final RecordMetadata slaveMetadata = slaveFactory.getMetadata();
final int columnCount = metadata.getColumnCount();
for (int i = 0; i < columnCount; i++) {
if (metadata.getColumnType(i) != slaveMetadata.getColumnType(i)) {
throw SqlException
.$(model.getUnionModel().getModelPosition(), "column type mismatch [index=").put(i)
.put(", A=").put(ColumnType.nameOf(metadata.getColumnType(i)))
.put(", B=").put(ColumnType.nameOf(slaveMetadata.getColumnType(i)))
.put(']');
}
}
final RecordCursorFactory unionAllFactory = new UnionAllRecordCursorFactory(metadata, masterFactory, slaveFactory);
if (model.getUnionModel().getUnionModel() != null) {
return generateSetFactory(model.getUnionModel(), unionAllFactory, executionContext);
}
return unionAllFactory;
}
private RecordCursorFactory generateUnionFactory(QueryModel model, RecordCursorFactory masterFactory, SqlExecutionContext executionContext, RecordCursorFactory slaveFactory) throws SqlException {
final RecordMetadata metadata = masterFactory.getMetadata();
final RecordMetadata slaveMetadata = slaveFactory.getMetadata();
final int columnCount = metadata.getColumnCount();
for (int i = 0; i < columnCount; i++) {
if (metadata.getColumnType(i) != slaveMetadata.getColumnType(i)) {
throw SqlException
.$(model.getUnionModel().getModelPosition(), "column type mismatch [index=").put(i)
.put(", A=").put(ColumnType.nameOf(metadata.getColumnType(i)))
.put(", B=").put(ColumnType.nameOf(slaveMetadata.getColumnType(i)))
.put(']');
}
}
entityColumnFilter.of(columnCount);
final RecordSink recordSink = RecordSinkFactory.getInstance(asm, metadata, entityColumnFilter, true);
valueTypes.reset();
RecordCursorFactory unionFactory = new UnionRecordCursorFactory(
configuration,
metadata,
masterFactory,
slaveFactory,
recordSink,
valueTypes
);
if (model.getUnionModel().getUnionModel() != null) {
return generateSetFactory(model.getUnionModel(), unionFactory, executionContext);
}
return unionFactory;
}
private void lookupColumnIndexes(
ListColumnFilter filter,
ObjList<ExpressionNode> columnNames,
......@@ -1508,4 +1517,11 @@ public class SqlCodeGenerator {
return firstColumnType;
}
static {
limitTypes.add(ColumnType.LONG);
limitTypes.add(ColumnType.BYTE);
limitTypes.add(ColumnType.SHORT);
limitTypes.add(ColumnType.INT);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package io.questdb.griffin.engine.groupby;
import io.questdb.cairo.AbstractRecordCursorFactory;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.GenericRecordMetadata;
import io.questdb.cairo.TableColumnMetadata;
import io.questdb.cairo.sql.*;
import io.questdb.griffin.SqlExecutionContext;
public class CountRecordCursorFactory extends AbstractRecordCursorFactory {
public static final GenericRecordMetadata DEFAULT_COUNT_METADATA = new GenericRecordMetadata();
private final RecordCursorFactory base;
private final CountRecordCursor cursor = new CountRecordCursor();
public CountRecordCursorFactory(RecordMetadata metadata, RecordCursorFactory base) {
super(metadata);
this.base = base;
}
@Override
public void close() {
base.close();
}
@Override
public RecordCursor getCursor(SqlExecutionContext executionContext) {
try (RecordCursor baseCursor = base.getCursor(executionContext)) {
final long size = baseCursor.size();
if (size < 0) {
long count = 0;
while (baseCursor.hasNext()) {
count++;
}
cursor.of(count);
} else {
cursor.of(size);
}
return cursor;
}
}
@Override
public boolean isRandomAccessCursor() {
return false;
}
private static class CountRecordCursor implements NoRandomAccessRecordCursor {
private final CountRecord countRecord = new CountRecord();
private boolean hasNext = true;
private long count;
@Override
public void close() {
}
@Override
public Record getRecord() {
return countRecord;
}
@Override
public boolean hasNext() {
if (hasNext) {
hasNext = false;
return true;
}
return false;
}
@Override
public void toTop() {
hasNext = true;
}
@Override
public long size() {
return 1;
}
private void of(long count) {
this.count = count;
toTop();
}
private class CountRecord implements Record {
@Override
public long getLong(int col) {
return count;
}
}
}
static {
DEFAULT_COUNT_METADATA.add(new TableColumnMetadata("count", ColumnType.LONG));
}
}
......@@ -115,16 +115,17 @@ public class TruncateTest extends AbstractGriffinTest {
"10\n",
"select count() from x",
null,
true
false
);
Assert.assertNull(compiler.compile("truncate table x"));
assertQuery(
"count\n",
"count\n" +
"0\n",
"select count() from x",
null,
true
false
);
Assert.assertEquals(0, engine.getBusyWriterCount());
......@@ -148,7 +149,7 @@ public class TruncateTest extends AbstractGriffinTest {
"10\n",
"select count() from x",
null,
true
false
);
assertQuery(
......@@ -156,7 +157,7 @@ public class TruncateTest extends AbstractGriffinTest {
"20\n",
"select count() from y",
null,
true
false
);
CyclicBarrier useBarrier = new CyclicBarrier(2);
......@@ -192,7 +193,7 @@ public class TruncateTest extends AbstractGriffinTest {
"10\n",
"select count() from x",
null,
true
false
);
assertQuery(
......@@ -200,7 +201,7 @@ public class TruncateTest extends AbstractGriffinTest {
"20\n",
"select count() from y",
null,
true
false
);
haltLatch.await(1, TimeUnit.SECONDS);
......@@ -210,38 +211,6 @@ public class TruncateTest extends AbstractGriffinTest {
});
}
@Test
public void testTruncateOpenReader() throws Exception {
TestUtils.assertMemoryLeak(() -> {
createX(1_000_000);
assertQuery(
"count\n" +
"1000000\n",
"select count() from x",
null,
true
);
try (RecordCursorFactory factory = compiler.compile("select * from x")) {
try (RecordCursor cursor = factory.getCursor(DefaultSqlExecutionContext.INSTANCE)) {
final Record record = cursor.getRecord();
while (cursor.hasNext()) {
record.getInt(0);
record.getSym(1);
record.getDouble(2);
}
}
}
compiler.compile("truncate table 'x'");
engine.releaseAllWriters();
engine.releaseAllReaders();
});
}
@Test
public void testTableDoesNotExist() throws Exception {
TestUtils.assertMemoryLeak(() -> {
......@@ -253,7 +222,7 @@ public class TruncateTest extends AbstractGriffinTest {
"10\n",
"select count() from x",
null,
true
false
);
assertQuery(
......@@ -261,7 +230,7 @@ public class TruncateTest extends AbstractGriffinTest {
"20\n",
"select count() from y",
null,
true
false
);
try {
......@@ -277,7 +246,7 @@ public class TruncateTest extends AbstractGriffinTest {
"10\n",
"select count() from x",
null,
true
false
);
assertQuery(
......@@ -285,7 +254,7 @@ public class TruncateTest extends AbstractGriffinTest {
"20\n",
"select count() from y",
null,
true
false
);
engine.releaseAllWriters();
......@@ -293,6 +262,38 @@ public class TruncateTest extends AbstractGriffinTest {
});
}
@Test
public void testTruncateOpenReader() throws Exception {
TestUtils.assertMemoryLeak(() -> {
createX(1_000_000);
assertQuery(
"count\n" +
"1000000\n",
"select count() from x",
null,
false
);
try (RecordCursorFactory factory = compiler.compile("select * from x")) {
try (RecordCursor cursor = factory.getCursor(DefaultSqlExecutionContext.INSTANCE)) {
final Record record = cursor.getRecord();
while (cursor.hasNext()) {
record.getInt(0);
record.getSym(1);
record.getDouble(2);
}
}
}
compiler.compile("truncate table 'x'");
engine.releaseAllWriters();
engine.releaseAllReaders();
});
}
@Test
public void testTwoTables() throws Exception {
TestUtils.assertMemoryLeak(() -> {
......@@ -304,7 +305,7 @@ public class TruncateTest extends AbstractGriffinTest {
"10\n",
"select count() from x",
null,
true
false
);
assertQuery(
......@@ -312,23 +313,25 @@ public class TruncateTest extends AbstractGriffinTest {
"20\n",
"select count() from y",
null,
true
false
);
Assert.assertNull(compiler.compile("truncate table x, y"));
assertQuery(
"count\n",
"count\n" +
"0\n",
"select count() from x",
null,
true
false
);
assertQuery(
"count\n",
"count\n" +
"0\n",
"select count() from y",
null,
true
false
);
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package io.questdb.griffin.engine.groupby;
import io.questdb.griffin.AbstractGriffinTest;
import io.questdb.griffin.engine.functions.rnd.SharedRandom;
import io.questdb.std.Rnd;
import org.junit.Before;
import org.junit.Test;
public class CountTest extends AbstractGriffinTest {
@Before
public void setUp3() {
SharedRandom.RANDOM.set(new Rnd());
}
@Test
public void testColumnAlias() throws Exception {
assertQuery("cnt\n" +
"20\n",
"select count() cnt from x",
"create table x as " +
"(" +
"select" +
" rnd_float(0)*100 a," +
" rnd_symbol(5,4,4,1) b," +
" rnd_double(0)*100 c," +
" abs(rnd_int()) d," +
" rnd_byte(2, 50) e," +
" abs(rnd_short()) f," +
" abs(rnd_long()) g," +
" timestamp_sequence(to_timestamp(0), 0) k" +
" from" +
" long_sequence(20)" +
") timestamp(k) partition by NONE",
null,
"insert into x select * from (" +
"select" +
" rnd_float(0)*100 a," +
" rnd_symbol(5,4,4,1) b," +
" rnd_double(0)*100 c," +
" abs(rnd_int()) d," +
" rnd_byte(2, 50) e," +
" abs(rnd_short()) f," +
" abs(rnd_long()) g," +
" timestamp_sequence(to_timestamp(0), 0) k" +
" from" +
" long_sequence(5)" +
") timestamp(k)",
"cnt\n" +
"25\n",
false);
}
@Test
public void testKnownSize() throws Exception {
assertQuery("count\n" +
"20\n",
"select count() from x",
"create table x as " +
"(" +
"select" +
" rnd_float(0)*100 a," +
" rnd_symbol(5,4,4,1) b," +
" rnd_double(0)*100 c," +
" abs(rnd_int()) d," +
" rnd_byte(2, 50) e," +
" abs(rnd_short()) f," +
" abs(rnd_long()) g," +
" timestamp_sequence(to_timestamp(0), 0) k" +
" from" +
" long_sequence(20)" +
") timestamp(k) partition by NONE",
null,
"insert into x select * from (" +
"select" +
" rnd_float(0)*100 a," +
" rnd_symbol(5,4,4,1) b," +
" rnd_double(0)*100 c," +
" abs(rnd_int()) d," +
" rnd_byte(2, 50) e," +
" abs(rnd_short()) f," +
" abs(rnd_long()) g," +
" timestamp_sequence(to_timestamp(0), 0) k" +
" from" +
" long_sequence(5)" +
") timestamp(k)",
"count\n" +
"25\n",
false);
}
@Test
public void testUnknownSize() throws Exception {
assertQuery("count\n" +
"4919\n",
"select count() from x where g > 0",
"create table x as " +
"(" +
"select" +
" rnd_float(0)*100 a," +
" rnd_symbol(5,4,4,1) b," +
" rnd_double(0)*100 c," +
" abs(rnd_int()) d," +
" rnd_byte(2, 50) e," +
" abs(rnd_short()) f," +
" rnd_long() g," +
" timestamp_sequence(to_timestamp(0), 0) k" +
" from" +
" long_sequence(10000)" +
") timestamp(k) partition by NONE",
null,
"insert into x select * from (" +
"select" +
" rnd_float(0)*100 a," +
" rnd_symbol(5,4,4,1) b," +
" rnd_double(0)*100 c," +
" abs(rnd_int()) d," +
" rnd_byte(2, 50) e," +
" abs(rnd_short()) f," +
" rnd_long() g," +
" timestamp_sequence(to_timestamp(0), 0) k" +
" from" +
" long_sequence(800)" +
") timestamp(k)",
"count\n" +
"5319\n",
false);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册