提交 793cc26e 编写于 作者: V Vlad Ilyushchenko

GRIFFIN: removed duplicate code. Improved performance of symbol based filter,...

GRIFFIN: removed duplicate code. Improved performance of symbol based filter, such as sym = 'XYZ' in cases where 'XYZ' does not exist. The cursor will not iterate over data if filter becomes constant.
上级 759158df
......@@ -28,13 +28,11 @@ import io.questdb.std.ImmutableIterator;
import java.io.Closeable;
public interface DataFrameCursor extends ImmutableIterator<DataFrame>, Closeable {
public interface DataFrameCursor extends ImmutableIterator<DataFrame>, Closeable, SymbolTableSource {
@Override
void close(); // we don't throw IOException
SymbolTable getSymbolTable(int columnIndex);
// same TableReader is available on each data frame
TableReader getTableReader();
......
......@@ -84,7 +84,7 @@ public interface Function extends Closeable {
int getType();
default void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
default void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
}
default boolean isConstant() {
......
......@@ -25,7 +25,7 @@ package io.questdb.cairo.sql;
import java.io.Closeable;
public interface RecordCursor extends Closeable {
public interface RecordCursor extends Closeable, SymbolTableSource {
@Override
void close();
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package io.questdb.cairo.sql;
@FunctionalInterface
public interface SymbolTableSource {
SymbolTable getSymbolTable(int columnIndex);
}
......@@ -402,7 +402,10 @@ public class SqlCodeGenerator {
// check if there are post-filters
ExpressionNode filter = model.getWhereClause();
if (filter != null) {
factory = new FilteredRecordCursorFactory(factory, functionParser.parseFunction(filter, factory.getMetadata(), executionContext));
factory = new FilteredRecordCursorFactory(
factory,
functionParser.parseFunction(filter, factory.getMetadata(), executionContext)
);
}
return factory;
......@@ -1398,21 +1401,8 @@ public class SqlCodeGenerator {
}
private RecordCursorFactory generateUnionAllFactory(QueryModel model, RecordCursorFactory masterFactory, SqlExecutionContext executionContext, RecordCursorFactory slaveFactory) throws SqlException {
final RecordMetadata metadata = masterFactory.getMetadata();
final RecordMetadata slaveMetadata = slaveFactory.getMetadata();
final int columnCount = metadata.getColumnCount();
for (int i = 0; i < columnCount; i++) {
if (metadata.getColumnType(i) != slaveMetadata.getColumnType(i)) {
throw SqlException
.$(model.getUnionModel().getModelPosition(), "column type mismatch [index=").put(i)
.put(", A=").put(ColumnType.nameOf(metadata.getColumnType(i)))
.put(", B=").put(ColumnType.nameOf(slaveMetadata.getColumnType(i)))
.put(']');
}
}
final RecordCursorFactory unionAllFactory = new UnionAllRecordCursorFactory(metadata, masterFactory, slaveFactory);
validateJoinColumnTypes(model, masterFactory, slaveFactory);
final RecordCursorFactory unionAllFactory = new UnionAllRecordCursorFactory(masterFactory, slaveFactory);
if (model.getUnionModel().getUnionModel() != null) {
return generateSetFactory(model.getUnionModel(), unionAllFactory, executionContext);
......@@ -1420,7 +1410,7 @@ public class SqlCodeGenerator {
return unionAllFactory;
}
private RecordCursorFactory generateUnionFactory(QueryModel model, RecordCursorFactory masterFactory, SqlExecutionContext executionContext, RecordCursorFactory slaveFactory) throws SqlException {
private void validateJoinColumnTypes(QueryModel model, RecordCursorFactory masterFactory, RecordCursorFactory slaveFactory) throws SqlException {
final RecordMetadata metadata = masterFactory.getMetadata();
final RecordMetadata slaveMetadata = slaveFactory.getMetadata();
final int columnCount = metadata.getColumnCount();
......@@ -1434,15 +1424,22 @@ public class SqlCodeGenerator {
.put(']');
}
}
}
entityColumnFilter.of(columnCount);
final RecordSink recordSink = RecordSinkFactory.getInstance(asm, metadata, entityColumnFilter, true);
private RecordCursorFactory generateUnionFactory(QueryModel model, RecordCursorFactory masterFactory, SqlExecutionContext executionContext, RecordCursorFactory slaveFactory) throws SqlException {
validateJoinColumnTypes(model, masterFactory, slaveFactory);
entityColumnFilter.of(masterFactory.getMetadata().getColumnCount());
final RecordSink recordSink = RecordSinkFactory.getInstance(
asm,
masterFactory.getMetadata(),
entityColumnFilter,
true
);
valueTypes.reset();
RecordCursorFactory unionFactory = new UnionRecordCursorFactory(
configuration,
metadata,
masterFactory,
slaveFactory,
recordSink,
......
......@@ -24,7 +24,7 @@
package io.questdb.griffin.engine.functions;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SymbolTableSource;
import io.questdb.griffin.SqlExecutionContext;
public interface BinaryFunction extends Function {
......@@ -36,9 +36,9 @@ public interface BinaryFunction extends Function {
}
@Override
default void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
getLeft().init(recordCursor, executionContext);
getRight().init(recordCursor, executionContext);
default void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
getLeft().init(symbolTableSource, executionContext);
getRight().init(symbolTableSource, executionContext);
}
@Override
......
......@@ -24,8 +24,6 @@
package io.questdb.griffin.engine.functions;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.griffin.SqlExecutionContext;
public interface StatelessFunction extends Function {
......@@ -33,10 +31,6 @@ public interface StatelessFunction extends Function {
default void close() {
}
@Override
default void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
}
@Override
default void toTop() {
}
......
......@@ -24,7 +24,7 @@
package io.questdb.griffin.engine.functions;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SymbolTableSource;
import io.questdb.griffin.SqlExecutionContext;
public interface UnaryFunction extends Function {
......@@ -34,8 +34,8 @@ public interface UnaryFunction extends Function {
}
@Override
default void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
getArg().init(recordCursor, executionContext);
default void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
getArg().init(symbolTableSource, executionContext);
}
@Override
......
......@@ -169,13 +169,13 @@ public class IndexedParameterLinkFunction implements Function {
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
base = executionContext.getBindVariableService().getFunction(variableIndex);
if (base == null) {
throw CairoException.instance(0).put("undefined bind variable: ").put(variableIndex);
}
assert base.getType() == type;
base.init(recordCursor, executionContext);
base.init(symbolTableSource, executionContext);
}
private Function getBase() {
......
......@@ -169,13 +169,13 @@ public class NamedParameterLinkFunction implements Function {
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
base = executionContext.getBindVariableService().getFunction(variableName);
if (base == null) {
throw CairoException.instance(0).put("undefined bind variable: ").put(variableName);
}
assert base.getType() == type;
base.init(recordCursor, executionContext);
base.init(symbolTableSource, executionContext);
}
private Function getBase() {
......
......@@ -26,10 +26,8 @@ package io.questdb.griffin.engine.functions.date;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.TimestampFunction;
import io.questdb.griffin.engine.functions.constants.TimestampConstant;
import io.questdb.std.Numbers;
......@@ -73,11 +71,6 @@ public class TimestampSequenceFunctionFactory implements FunctionFactory {
return result;
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
toTop();
}
@Override
public void toTop() {
next = start;
......
......@@ -26,8 +26,7 @@ package io.questdb.griffin.engine.functions.eq;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.cairo.sql.SymbolTableSource;
import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.BinaryFunction;
......@@ -105,15 +104,12 @@ public class EqSymCharFunctionFactory implements FunctionFactory {
@Override
public boolean getBool(Record rec) {
if (valueIndex == SymbolTable.VALUE_NOT_FOUND) {
return false;
}
return arg.getInt(rec) == valueIndex;
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
valueIndex = recordCursor.getSymbolTable(arg.getColumnIndex()).getQuick(SingleCharCharSequence.get(constant));
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
valueIndex = symbolTableSource.getSymbolTable(arg.getColumnIndex()).getQuick(SingleCharCharSequence.get(constant));
}
}
......
......@@ -26,8 +26,8 @@ package io.questdb.griffin.engine.functions.eq;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.cairo.sql.SymbolTableSource;
import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.BinaryFunction;
......@@ -63,9 +63,6 @@ public class EqSymStrFunctionFactory implements FunctionFactory {
private Function createHalfConstantFunc(int position, Function constFunc, Function varFunc) {
CharSequence constValue = constFunc.getStr(null);
if (varFunc instanceof SymbolColumn) {
if (constValue == null) {
return new NullCheckColumnFunc(position, varFunc);
}
return new ConstCheckColumnFunc(position, (SymbolColumn) varFunc, constValue);
} else {
if (constValue == null) {
......@@ -94,30 +91,6 @@ public class EqSymStrFunctionFactory implements FunctionFactory {
}
}
private static class NullCheckColumnFunc extends BooleanFunction implements UnaryFunction {
private final Function arg;
public NullCheckColumnFunc(int position, Function arg) {
super(position);
this.arg = arg;
}
@Override
public Function getArg() {
return arg;
}
@Override
public boolean getBool(Record rec) {
return arg.getInt(rec) == SymbolTable.VALUE_IS_NULL;
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
}
}
private static class ConstCheckFunc extends BooleanFunction implements UnaryFunction {
private final Function arg;
private final CharSequence constant;
......@@ -157,15 +130,17 @@ public class EqSymStrFunctionFactory implements FunctionFactory {
@Override
public boolean getBool(Record rec) {
if (valueIndex == SymbolTable.VALUE_NOT_FOUND) {
return false;
}
return arg.getInt(rec) == valueIndex;
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
valueIndex = recordCursor.getSymbolTable(arg.getColumnIndex()).getQuick(constant);
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
valueIndex = symbolTableSource.getSymbolTable(arg.getColumnIndex()).getQuick(constant);
}
@Override
public boolean isConstant() {
return valueIndex == SymbolTable.VALUE_NOT_FOUND;
}
}
......
......@@ -105,12 +105,12 @@ public class SymbolInCursorFunctionFactory implements FunctionFactory {
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
valueArg.init(recordCursor, executionContext);
cursorArg.init(recordCursor, executionContext);
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
valueArg.init(symbolTableSource, executionContext);
cursorArg.init(symbolTableSource, executionContext);
symbolKeys.clear();
SymbolTable symbolTable = recordCursor.getSymbolTable(columnIndex);
final SymbolTable symbolTable = symbolTableSource.getSymbolTable(columnIndex);
RecordCursorFactory factory = cursorArg.getRecordCursorFactory();
try (RecordCursor cursor = factory.getCursor(executionContext)) {
......@@ -150,9 +150,9 @@ public class SymbolInCursorFunctionFactory implements FunctionFactory {
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
valueArg.init(recordCursor, executionContext);
cursorArg.init(recordCursor, executionContext);
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
valueArg.init(symbolTableSource, executionContext);
cursorArg.init(symbolTableSource, executionContext);
CharSequenceHashSet valueSet;
if (this.valueSet == this.valueSetA) {
......
......@@ -109,6 +109,12 @@ public class GroupByRecordCursorFactory implements RecordCursorFactory {
public RecordCursor getCursor(SqlExecutionContext executionContext) {
dataMap.clear();
final RecordCursor baseCursor = base.getCursor(executionContext);
cursor.of(baseCursor);
// init all record function for this cursor, in case functions require metadata and/or symbol tables
for (int i = 0, m = recordFunctions.size(); i < m; i++) {
recordFunctions.getQuick(i).init(cursor, executionContext);
}
try {
final Record baseRecord = baseCursor.getRecord();
final int n = groupByFunctions.size();
......@@ -118,7 +124,8 @@ public class GroupByRecordCursorFactory implements RecordCursorFactory {
MapValue value = key.createValue();
GroupByUtils.updateFunctions(groupByFunctions, n, value, baseRecord);
}
return initFunctionsAndCursor(executionContext, dataMap.getCursor(), baseCursor);
cursor.setMapCursor(dataMap.getCursor());
return cursor;
} catch (CairoException e) {
baseCursor.close();
throw e;
......@@ -135,20 +142,6 @@ public class GroupByRecordCursorFactory implements RecordCursorFactory {
return true;
}
@NotNull
protected RecordCursor initFunctionsAndCursor(
SqlExecutionContext executionContext,
RecordCursor mapCursor,
RecordCursor baseCursor
) {
cursor.of(mapCursor, baseCursor);
// init all record function for this cursor, in case functions require metadata and/or symbol tables
for (int i = 0, m = recordFunctions.size(); i < m; i++) {
recordFunctions.getQuick(i).init(cursor, executionContext);
}
return cursor;
}
private static class GroupByRecordCursor implements RecordCursor {
private final VirtualRecord functionRecord;
private final IntIntHashMap symbolTableIndex;
......@@ -209,9 +202,12 @@ public class GroupByRecordCursorFactory implements RecordCursorFactory {
mapCursor.toTop();
}
public void of(RecordCursor mapCursor, RecordCursor baseCursor) {
this.mapCursor = mapCursor;
public void of(RecordCursor baseCursor) {
this.baseCursor = baseCursor;
}
private void setMapCursor(RecordCursor mapCursor) {
this.mapCursor = mapCursor;
functionRecord.of(mapCursor.getRecord());
}
}
......
......@@ -24,8 +24,8 @@
package io.questdb.griffin.engine.groupby;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.cairo.sql.SymbolTableSource;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.SymbolFunction;
......@@ -51,7 +51,7 @@ public class MapSymbolColumn extends SymbolFunction {
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext executionContext) {
this.symbolTable = recordCursor.getSymbolTable(cursorColumnIndex);
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
this.symbolTable = symbolTableSource.getSymbolTable(cursorColumnIndex);
}
}
......@@ -71,7 +71,7 @@ class DataFrameRecordCursor extends AbstractDataFrameRecordCursor {
this.rowCursorFactory.prepareCursor(dataFrameCursor.getTableReader());
rowCursor = null;
if (filter != null) {
filter.init(this, executionContext);
filter.init(dataFrameCursor, executionContext);
}
}
......
......@@ -55,11 +55,23 @@ class FilteredRecordCursor implements RecordCursor {
@Override
public boolean hasNext() {
if (filter.isConstant()) {
return filterIsConstant();
}
while (base.hasNext()) {
if (filter.getBool(record)) {
return true;
}
}
return false;
}
private boolean filterIsConstant() {
if (filter.getBool(record)) {
return base.hasNext();
}
return false;
}
......
......@@ -35,11 +35,10 @@ public class UnionAllRecordCursorFactory implements RecordCursorFactory {
private final UnionAllRecordCursor cursor;
public UnionAllRecordCursorFactory(
RecordMetadata metadata,
RecordCursorFactory masterFactory,
RecordCursorFactory slaveFactory
) {
this.metadata = metadata;
this.metadata = masterFactory.getMetadata();
this.masterFactory = masterFactory;
this.slaveFactory = slaveFactory;
......
......@@ -41,13 +41,12 @@ public class UnionRecordCursorFactory implements RecordCursorFactory {
public UnionRecordCursorFactory(
CairoConfiguration configuration,
RecordMetadata metadata,
RecordCursorFactory masterFactory,
RecordCursorFactory slaveFactory,
RecordSink recordSink,
ColumnTypes valueTypes
) {
this.metadata = metadata;
this.metadata = masterFactory.getMetadata();
this.masterFactory = masterFactory;
this.slaveFactory = slaveFactory;
......
......@@ -26,7 +26,7 @@ package io.questdb.griffin.engine.functions.str;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SymbolTableSource;
import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.BooleanFunction;
......@@ -85,7 +85,7 @@ public class TestMatchFunctionFactory implements FunctionFactory {
}
@Override
public void init(RecordCursor recordCursor, SqlExecutionContext sqlExecutionContext) {
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext sqlExecutionContext) {
openCounter.incrementAndGet();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册